ibzib commented on a change in pull request #16513:
URL: https://github.com/apache/beam/pull/16513#discussion_r793031310



##########
File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ProjectionPushdownOptimizer.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.construction.graph;
+
+import java.util.AbstractMap.SimpleEntry;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.runners.PTransformMatcher;
+import org.apache.beam.sdk.runners.PTransformOverride;
+import org.apache.beam.sdk.runners.PTransformOverrideFactory;
+import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
+import org.apache.beam.sdk.schemas.ProjectionProducer;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.Preconditions;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.TaggedPValue;
+import org.apache.beam.sdk.values.TupleTag;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+
+/** See {@link #optimize(Pipeline)}. */
+public class ProjectionPushdownOptimizer {
+
+  /**
+   * Performs all known projection pushdown optimizations in-place on a 
Pipeline.
+   *
+   * <p>A pushdown optimization will be made wherever there is a {@link 
ProjectionProducer} that

Review comment:
       Done.

##########
File path: 
runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ProjectionPushdownOptimizerTest.java
##########
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.construction.graph;
+
+import java.util.Map;
+import java.util.Objects;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.Pipeline.PipelineVisitor.Defaults;
+import org.apache.beam.sdk.runners.TransformHierarchy.Node;
+import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
+import org.apache.beam.sdk.schemas.FieldAccessDescriptor.FieldDescriptor;
+import org.apache.beam.sdk.schemas.ProjectionProducer;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Impulse;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Unit tests for {@link ProjectionPushdownOptimizer}. */
+@RunWith(JUnit4.class)
+public class ProjectionPushdownOptimizerTest {
+
+  @Test
+  public void testSourceDoesNotImplementPushdownProjector() {
+    Pipeline p = Pipeline.create();
+    SimpleSource source = new 
SimpleSource(FieldAccessDescriptor.withAllFields());
+    p.apply(source)
+        .apply(new 
FieldAccessTransform(FieldAccessDescriptor.withFieldNames("foo", "bar")));
+
+    ProjectionPushdownOptimizer.optimize(p);
+    Assert.assertTrue(pipelineHasTransform(p, source));
+  }
+
+  @Test
+  public void testSimpleProjectionPushdown() {
+    Pipeline p = Pipeline.create();
+    SimpleSourceWithPushdown originalSource =
+        new SimpleSourceWithPushdown(FieldAccessDescriptor.withAllFields());
+    FieldAccessDescriptor downstreamFieldAccess =
+        FieldAccessDescriptor.withFieldNames("foo", "bar");
+    p.apply(originalSource).apply(new 
FieldAccessTransform(downstreamFieldAccess));
+
+    SimpleSourceWithPushdown expectedSource = new 
SimpleSourceWithPushdown(downstreamFieldAccess);
+
+    ProjectionPushdownOptimizer.optimize(p);
+    Assert.assertTrue(pipelineHasTransform(p, expectedSource));
+    Assert.assertFalse(pipelineHasTransform(p, originalSource));
+  }
+
+  @Test
+  public void testBranchedProjectionPushdown() {
+    Pipeline p = Pipeline.create();
+    SimpleSourceWithPushdown originalSource =
+        new SimpleSourceWithPushdown(FieldAccessDescriptor.withAllFields());
+    PCollection<Row> input = p.apply(originalSource);
+    input.apply(new 
FieldAccessTransform(FieldAccessDescriptor.withFieldNames("foo")));
+    input.apply(new 
FieldAccessTransform(FieldAccessDescriptor.withFieldNames("bar")));
+
+    SimpleSourceWithPushdown expectedSource =
+        new 
SimpleSourceWithPushdown(FieldAccessDescriptor.withFieldNames("foo", "bar"));
+
+    ProjectionPushdownOptimizer.optimize(p);
+    Assert.assertTrue(pipelineHasTransform(p, expectedSource));
+    Assert.assertFalse(pipelineHasTransform(p, originalSource));
+  }
+
+  @Test
+  public void testIntermediateProducer() {
+    Pipeline p = Pipeline.create();
+    SimpleSource source = new 
SimpleSource(FieldAccessDescriptor.withAllFields());
+    IntermediateTransformWithPushdown originalT =
+        new 
IntermediateTransformWithPushdown(FieldAccessDescriptor.withAllFields());
+    FieldAccessDescriptor downstreamFieldAccess =
+        FieldAccessDescriptor.withFieldNames("foo", "bar");
+    p.apply(source).apply(originalT).apply(new 
FieldAccessTransform(downstreamFieldAccess));
+
+    // TODO(BEAM-13658) Support pushdown on intermediate transforms.
+    // For now, test that the pushdown optimizer ignores immediate transforms.

Review comment:
       I wanted to make sure the optimizer wouldn't throw an exception in this 
case, since that would break the user's pipeline (and it was throwing before I 
added this test case and fixed the issue).

##########
File path: 
runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ProjectionPushdownOptimizerTest.java
##########
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.construction.graph;
+
+import java.util.Map;
+import java.util.Objects;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.Pipeline.PipelineVisitor.Defaults;
+import org.apache.beam.sdk.runners.TransformHierarchy.Node;
+import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
+import org.apache.beam.sdk.schemas.FieldAccessDescriptor.FieldDescriptor;
+import org.apache.beam.sdk.schemas.ProjectionProducer;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Impulse;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Unit tests for {@link ProjectionPushdownOptimizer}. */
+@RunWith(JUnit4.class)
+public class ProjectionPushdownOptimizerTest {
+
+  @Test
+  public void testSourceDoesNotImplementPushdownProjector() {
+    Pipeline p = Pipeline.create();
+    SimpleSource source = new 
SimpleSource(FieldAccessDescriptor.withAllFields());
+    p.apply(source)
+        .apply(new 
FieldAccessTransform(FieldAccessDescriptor.withFieldNames("foo", "bar")));
+
+    ProjectionPushdownOptimizer.optimize(p);
+    Assert.assertTrue(pipelineHasTransform(p, source));
+  }
+
+  @Test
+  public void testSimpleProjectionPushdown() {
+    Pipeline p = Pipeline.create();
+    SimpleSourceWithPushdown originalSource =
+        new SimpleSourceWithPushdown(FieldAccessDescriptor.withAllFields());
+    FieldAccessDescriptor downstreamFieldAccess =
+        FieldAccessDescriptor.withFieldNames("foo", "bar");
+    p.apply(originalSource).apply(new 
FieldAccessTransform(downstreamFieldAccess));
+
+    SimpleSourceWithPushdown expectedSource = new 
SimpleSourceWithPushdown(downstreamFieldAccess);
+
+    ProjectionPushdownOptimizer.optimize(p);
+    Assert.assertTrue(pipelineHasTransform(p, expectedSource));
+    Assert.assertFalse(pipelineHasTransform(p, originalSource));
+  }
+
+  @Test
+  public void testBranchedProjectionPushdown() {
+    Pipeline p = Pipeline.create();
+    SimpleSourceWithPushdown originalSource =
+        new SimpleSourceWithPushdown(FieldAccessDescriptor.withAllFields());
+    PCollection<Row> input = p.apply(originalSource);
+    input.apply(new 
FieldAccessTransform(FieldAccessDescriptor.withFieldNames("foo")));
+    input.apply(new 
FieldAccessTransform(FieldAccessDescriptor.withFieldNames("bar")));
+
+    SimpleSourceWithPushdown expectedSource =
+        new 
SimpleSourceWithPushdown(FieldAccessDescriptor.withFieldNames("foo", "bar"));
+
+    ProjectionPushdownOptimizer.optimize(p);
+    Assert.assertTrue(pipelineHasTransform(p, expectedSource));
+    Assert.assertFalse(pipelineHasTransform(p, originalSource));
+  }
+
+  @Test
+  public void testIntermediateProducer() {
+    Pipeline p = Pipeline.create();
+    SimpleSource source = new 
SimpleSource(FieldAccessDescriptor.withAllFields());
+    IntermediateTransformWithPushdown originalT =
+        new 
IntermediateTransformWithPushdown(FieldAccessDescriptor.withAllFields());
+    FieldAccessDescriptor downstreamFieldAccess =
+        FieldAccessDescriptor.withFieldNames("foo", "bar");
+    p.apply(source).apply(originalT).apply(new 
FieldAccessTransform(downstreamFieldAccess));
+
+    // TODO(BEAM-13658) Support pushdown on intermediate transforms.
+    // For now, test that the pushdown optimizer ignores immediate transforms.
+    ProjectionPushdownOptimizer.optimize(p);
+    Assert.assertTrue(pipelineHasTransform(p, originalT));
+  }
+
+  @Test
+  public void testMultipleOutputs() {
+    Pipeline p = Pipeline.create();
+    MultipleOutputSourceWithPushdown originalSource =
+        new MultipleOutputSourceWithPushdown(
+            FieldAccessDescriptor.withAllFields(), 
FieldAccessDescriptor.withAllFields());
+    PCollectionTuple inputs = p.apply(originalSource);
+    inputs
+        .get(originalSource.tag1)
+        .apply(new 
FieldAccessTransform(FieldAccessDescriptor.withFieldNames("foo")));
+    inputs
+        .get(originalSource.tag1)
+        .apply(new 
FieldAccessTransform(FieldAccessDescriptor.withFieldNames("bar")));
+    inputs
+        .get(originalSource.tag2)
+        .apply(new 
FieldAccessTransform(FieldAccessDescriptor.withFieldNames("baz")));
+    inputs
+        .get(originalSource.tag2)
+        .apply(new 
FieldAccessTransform(FieldAccessDescriptor.withFieldNames("qux")));
+
+    MultipleOutputSourceWithPushdown expectedSource =
+        new MultipleOutputSourceWithPushdown(
+            FieldAccessDescriptor.withFieldNames("foo", "bar"),
+            FieldAccessDescriptor.withFieldNames("baz", "qux"));
+
+    ProjectionPushdownOptimizer.optimize(p);
+    Assert.assertTrue(pipelineHasTransform(p, expectedSource));
+    Assert.assertFalse(pipelineHasTransform(p, originalSource));
+  }
+
+  private static boolean pipelineHasTransform(Pipeline p, PTransform<?, ?> t) {
+    HasTransformVisitor hasTransformVisitor = new HasTransformVisitor(t);
+    p.traverseTopologically(hasTransformVisitor);
+    return hasTransformVisitor.found;
+  }
+
+  private static class HasTransformVisitor extends Defaults {
+    private final PTransform<?, ?> t;
+    boolean found = false;
+
+    HasTransformVisitor(PTransform<?, ?> t) {
+      this.t = t;
+    }
+
+    @Override
+    public CompositeBehavior enterCompositeTransform(Node node) {
+      if (t.equals(node.getTransform())) {
+        found = true;
+        return CompositeBehavior.DO_NOT_ENTER_TRANSFORM;
+      }
+      return CompositeBehavior.ENTER_TRANSFORM;
+    }
+  }
+
+  private static class FieldAccessTransform extends 
PTransform<PCollection<Row>, PCollection<Row>> {
+    private final FieldAccessDescriptor fieldAccessDescriptor;
+
+    FieldAccessTransform(FieldAccessDescriptor fieldAccessDescriptor) {
+      this.fieldAccessDescriptor = fieldAccessDescriptor;
+    }
+
+    @Override
+    public PCollection<Row> expand(PCollection<Row> input) {
+      return input
+          .apply(
+              ParDo.of(
+                  new DoFn<Row, Row>() {
+                    @SuppressWarnings("unused") // used by reflection
+                    @FieldAccess("row")
+                    private final FieldAccessDescriptor fieldAccessDescriptor =
+                        FieldAccessTransform.this.fieldAccessDescriptor;
+
+                    @ProcessElement
+                    public void processElement(
+                        @FieldAccess("row") Row row, OutputReceiver<Row> 
outputReceiver)
+                        throws Exception {
+                      // Do nothing; we don't need to execute this DoFn.
+                    }
+                  }))
+          .setRowSchema(createStringSchema(fieldAccessDescriptor));
+    }
+  }
+
+  private static Schema createStringSchema(FieldAccessDescriptor 
fieldAccessDescriptor) {
+    if (fieldAccessDescriptor.getAllFields()) {
+      return createStringSchema(FieldAccessDescriptor.withFieldNames("foo", 
"bar", "baz", "qux"));
+    }
+    Schema.Builder schemaBuilder = Schema.builder();
+    for (FieldDescriptor field : fieldAccessDescriptor.getFieldsAccessed()) {
+      schemaBuilder.addStringField(field.getFieldName());
+    }
+    return schemaBuilder.build();
+  }
+
+  private abstract static class SchemaTransform<InputT extends PInput>

Review comment:
       Done.

##########
File path: 
runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ProjectionPushdownOptimizerTest.java
##########
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.construction.graph;
+
+import java.util.Map;
+import java.util.Objects;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.Pipeline.PipelineVisitor.Defaults;
+import org.apache.beam.sdk.runners.TransformHierarchy.Node;
+import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
+import org.apache.beam.sdk.schemas.FieldAccessDescriptor.FieldDescriptor;
+import org.apache.beam.sdk.schemas.ProjectionProducer;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Impulse;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Unit tests for {@link ProjectionPushdownOptimizer}. */
+@RunWith(JUnit4.class)
+public class ProjectionPushdownOptimizerTest {
+
+  @Test
+  public void testSourceDoesNotImplementPushdownProjector() {
+    Pipeline p = Pipeline.create();
+    SimpleSource source = new 
SimpleSource(FieldAccessDescriptor.withAllFields());

Review comment:
       Done.

##########
File path: 
runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ProjectionPushdownOptimizerTest.java
##########
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.construction.graph;
+
+import java.util.Map;
+import java.util.Objects;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.Pipeline.PipelineVisitor.Defaults;
+import org.apache.beam.sdk.runners.TransformHierarchy.Node;
+import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
+import org.apache.beam.sdk.schemas.FieldAccessDescriptor.FieldDescriptor;
+import org.apache.beam.sdk.schemas.ProjectionProducer;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Impulse;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Unit tests for {@link ProjectionPushdownOptimizer}. */
+@RunWith(JUnit4.class)
+public class ProjectionPushdownOptimizerTest {
+
+  @Test
+  public void testSourceDoesNotImplementPushdownProjector() {
+    Pipeline p = Pipeline.create();
+    SimpleSource source = new 
SimpleSource(FieldAccessDescriptor.withAllFields());
+    p.apply(source)
+        .apply(new 
FieldAccessTransform(FieldAccessDescriptor.withFieldNames("foo", "bar")));
+
+    ProjectionPushdownOptimizer.optimize(p);
+    Assert.assertTrue(pipelineHasTransform(p, source));
+  }
+
+  @Test
+  public void testSimpleProjectionPushdown() {
+    Pipeline p = Pipeline.create();
+    SimpleSourceWithPushdown originalSource =
+        new SimpleSourceWithPushdown(FieldAccessDescriptor.withAllFields());
+    FieldAccessDescriptor downstreamFieldAccess =
+        FieldAccessDescriptor.withFieldNames("foo", "bar");
+    p.apply(originalSource).apply(new 
FieldAccessTransform(downstreamFieldAccess));
+
+    SimpleSourceWithPushdown expectedSource = new 
SimpleSourceWithPushdown(downstreamFieldAccess);
+
+    ProjectionPushdownOptimizer.optimize(p);
+    Assert.assertTrue(pipelineHasTransform(p, expectedSource));
+    Assert.assertFalse(pipelineHasTransform(p, originalSource));
+  }
+
+  @Test
+  public void testBranchedProjectionPushdown() {
+    Pipeline p = Pipeline.create();
+    SimpleSourceWithPushdown originalSource =
+        new SimpleSourceWithPushdown(FieldAccessDescriptor.withAllFields());
+    PCollection<Row> input = p.apply(originalSource);
+    input.apply(new 
FieldAccessTransform(FieldAccessDescriptor.withFieldNames("foo")));
+    input.apply(new 
FieldAccessTransform(FieldAccessDescriptor.withFieldNames("bar")));
+
+    SimpleSourceWithPushdown expectedSource =
+        new 
SimpleSourceWithPushdown(FieldAccessDescriptor.withFieldNames("foo", "bar"));
+
+    ProjectionPushdownOptimizer.optimize(p);
+    Assert.assertTrue(pipelineHasTransform(p, expectedSource));
+    Assert.assertFalse(pipelineHasTransform(p, originalSource));
+  }
+
+  @Test
+  public void testIntermediateProducer() {
+    Pipeline p = Pipeline.create();
+    SimpleSource source = new 
SimpleSource(FieldAccessDescriptor.withAllFields());
+    IntermediateTransformWithPushdown originalT =
+        new 
IntermediateTransformWithPushdown(FieldAccessDescriptor.withAllFields());
+    FieldAccessDescriptor downstreamFieldAccess =
+        FieldAccessDescriptor.withFieldNames("foo", "bar");
+    p.apply(source).apply(originalT).apply(new 
FieldAccessTransform(downstreamFieldAccess));
+
+    // TODO(BEAM-13658) Support pushdown on intermediate transforms.
+    // For now, test that the pushdown optimizer ignores immediate transforms.
+    ProjectionPushdownOptimizer.optimize(p);
+    Assert.assertTrue(pipelineHasTransform(p, originalT));
+  }
+
+  @Test
+  public void testMultipleOutputs() {
+    Pipeline p = Pipeline.create();
+    MultipleOutputSourceWithPushdown originalSource =
+        new MultipleOutputSourceWithPushdown(
+            FieldAccessDescriptor.withAllFields(), 
FieldAccessDescriptor.withAllFields());

Review comment:
       Done.

##########
File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/PCollectionOutputTagVisitor.java
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.construction.graph;
+
+import java.util.AbstractMap.SimpleEntry;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.Pipeline.PipelineVisitor;
+import org.apache.beam.sdk.runners.TransformHierarchy.Node;
+import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
+import org.apache.beam.sdk.schemas.ProjectionProducer;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.Preconditions;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.BiMap;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableBiMap;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+
+/**
+ * {@link PipelineVisitor} to convert projection pushdown targets from 
PCollections to TupleTags.
+ *
+ * <p>For example, if we can do pushdown on PTransform T's output PCollection 
P by rewriting T, we
+ * need to get T's output tag for P. This is necessary because PCollection 
objects are not
+ * instantiated until pipeline construction, but output tags are constants 
that are known before
+ * pipeline construction, so transform authors can identify them in {@link
+ * ProjectionProducer#actuateProjectionPushdown(Map)}.

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to