lukecwik commented on a change in pull request #13118: URL: https://github.com/apache/beam/pull/13118#discussion_r508101186
########## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValues.java ########## @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.values; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; + +import java.util.LinkedHashMap; +import java.util.Map; +import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; + +/** + * <b><i>For internal use. No backwards compatibility guarantees.</i></b> + * + * <p>A primitive value within Beam. + */ +@Internal +public class PValues { + + // Do not instantiate + private PValues() {} + + // For backwards-compatibility, PCollectionView is still a "PValue" to users, which occurs in Review comment: Might as well and make this javadoc. ########## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValues.java ########## @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.values; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; + +import java.util.LinkedHashMap; +import java.util.Map; +import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; + +/** + * <b><i>For internal use. No backwards compatibility guarantees.</i></b> + * + * <p>A primitive value within Beam. + */ +@Internal +public class PValues { + + // Do not instantiate + private PValues() {} + + // For backwards-compatibility, PCollectionView is still a "PValue" to users, which occurs in + // three places: + // + // POutput#expand (users can write custom POutputs) + // PInput#expand (users can write custom PInputs) + // PTransform#getAdditionalInputs (users can have their composites report inputs not passed by + // apply()) + // + // These all return Map<TupleTag<?> PValue>. A user's implementation of these methods is permitted + // to return + // either a PCollection or a PCollectionView for each PValue. PCollection's expand to themselves + // and + // PCollectionView expands to the PCollection that it is a view of. + public static Map<TupleTag<?>, PCollection<?>> fullyExpand( + Map<TupleTag<?>, PValue> partiallyExpanded) { + Map<TupleTag<?>, PCollection<?>> result = new LinkedHashMap<>(); + for (Map.Entry<TupleTag<?>, PValue> pvalue : partiallyExpanded.entrySet()) { + if (pvalue.getValue() instanceof PCollection) { + PCollection<?> previous = result.put(pvalue.getKey(), (PCollection<?>) pvalue.getValue()); + checkArgument( + previous == null, + "Found conflicting %ss in flattened expansion of %s: %s maps to %s and %s", + partiallyExpanded, + TupleTag.class.getSimpleName(), + pvalue.getKey(), + previous, + pvalue.getValue()); + } else { + if (pvalue.getValue().expand().size() == 1 + && Iterables.getOnlyElement(pvalue.getValue().expand().values()) + .equals(pvalue.getValue())) { + throw new IllegalStateException( + String.format( + "Non %s %s that expands into itself %s", + PCollection.class.getSimpleName(), + PValue.class.getSimpleName(), + pvalue.getValue())); + } + // At this point we know it is a PCollectionView or some internal hacked PValue. To be Review comment: nit: if you use `/* */` blocks they will get line formatted correctly ########## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PTransformReplacements.java ########## @@ -15,20 +15,22 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.runners.core.construction; +package org.apache.beam.sdk.runners; import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; import java.util.Map; import java.util.Set; -import org.apache.beam.sdk.runners.AppliedPTransform; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PValue; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; /** */ +@Internal Review comment: Why did this (and related classes) have to move? I could be convinced otherwise to keep the move but it looks like long term we would want to get rid of Pipeline#replaceAll some day since runners should only doing proto -> proto conversions. The only place I could find it in was PipelineTest. Can we instead move that test to somewhere in runners core construction? ########## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java ########## @@ -1403,4 +1427,55 @@ public int size() { }; } } + + public static <InputT, ViewT> PCollectionView<ViewT> findPCollectionView( Review comment: Can we choose not to expose this as it looks like it is only used within PipelineTest.java If there is a future need we can move it again (in hopefully a much smaller change) so its easier to reason about visibility. ########## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValues.java ########## @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.values; Review comment: This might make more sense in `org.apache.beam.sdk.runners` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
