kennknowles commented on a change in pull request #13118: URL: https://github.com/apache/beam/pull/13118#discussion_r509442928
########## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValues.java ########## @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.values; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; + +import java.util.LinkedHashMap; +import java.util.Map; +import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; + +/** + * <b><i>For internal use. No backwards compatibility guarantees.</i></b> + * + * <p>A primitive value within Beam. + */ +@Internal +public class PValues { + + // Do not instantiate + private PValues() {} + + // For backwards-compatibility, PCollectionView is still a "PValue" to users, which occurs in Review comment: Done ########## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValues.java ########## @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.values; Review comment: I put it here because it is a [companion](https://docs.scala-lang.org/overviews/scala-book/companion-objects.html) to `PValue`. It is conventional for that to be the plural name, in the same package. If you still want me to move it after this explanation, I will. I don't care enough to block on it. ########## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PTransformReplacements.java ########## @@ -15,20 +15,22 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.runners.core.construction; +package org.apache.beam.sdk.runners; import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; import java.util.Map; import java.util.Set; -import org.apache.beam.sdk.runners.AppliedPTransform; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PValue; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; /** */ +@Internal Review comment: Resolved: I am removing the moves from the PR to avoid getting blocked on anything that could reopen discussions. ########## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValues.java ########## @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.values; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; + +import java.util.LinkedHashMap; +import java.util.Map; +import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; + +/** + * <b><i>For internal use. No backwards compatibility guarantees.</i></b> + * + * <p>A primitive value within Beam. + */ +@Internal +public class PValues { + + // Do not instantiate + private PValues() {} + + // For backwards-compatibility, PCollectionView is still a "PValue" to users, which occurs in + // three places: + // + // POutput#expand (users can write custom POutputs) + // PInput#expand (users can write custom PInputs) + // PTransform#getAdditionalInputs (users can have their composites report inputs not passed by + // apply()) + // + // These all return Map<TupleTag<?> PValue>. A user's implementation of these methods is permitted + // to return + // either a PCollection or a PCollectionView for each PValue. PCollection's expand to themselves + // and + // PCollectionView expands to the PCollection that it is a view of. + public static Map<TupleTag<?>, PCollection<?>> fullyExpand( + Map<TupleTag<?>, PValue> partiallyExpanded) { + Map<TupleTag<?>, PCollection<?>> result = new LinkedHashMap<>(); + for (Map.Entry<TupleTag<?>, PValue> pvalue : partiallyExpanded.entrySet()) { + if (pvalue.getValue() instanceof PCollection) { + PCollection<?> previous = result.put(pvalue.getKey(), (PCollection<?>) pvalue.getValue()); + checkArgument( + previous == null, + "Found conflicting %ss in flattened expansion of %s: %s maps to %s and %s", + partiallyExpanded, + TupleTag.class.getSimpleName(), + pvalue.getKey(), + previous, + pvalue.getValue()); + } else { + if (pvalue.getValue().expand().size() == 1 + && Iterables.getOnlyElement(pvalue.getValue().expand().values()) + .equals(pvalue.getValue())) { + throw new IllegalStateException( + String.format( + "Non %s %s that expands into itself %s", + PCollection.class.getSimpleName(), + PValue.class.getSimpleName(), + pvalue.getValue())); + } + // At this point we know it is a PCollectionView or some internal hacked PValue. To be Review comment: Done ########## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java ########## @@ -1403,4 +1427,55 @@ public int size() { }; } } + + public static <InputT, ViewT> PCollectionView<ViewT> findPCollectionView( Review comment: I've moved it into the test class where it is used. I would still like to leave the alteration of `DataflowRunner` out of this PR, since my other PR that just adds a `checkState` illustrates that the `DataflowRunner` batch view overrides result in a corrupted graph that sort of works by luck. I don't want to disturb that potentially sensitive situation. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
