lukecwik commented on a change in pull request #13118:
URL: https://github.com/apache/beam/pull/13118#discussion_r508101186



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValues.java
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.values;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+import org.apache.beam.sdk.annotations.Internal;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+
+/**
+ * <b><i>For internal use. No backwards compatibility guarantees.</i></b>
+ *
+ * <p>A primitive value within Beam.
+ */
+@Internal
+public class PValues {
+
+  // Do not instantiate
+  private PValues() {}
+
+  // For backwards-compatibility, PCollectionView is still a "PValue" to 
users, which occurs in

Review comment:
       Might as well and make this javadoc.

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValues.java
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.values;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+import org.apache.beam.sdk.annotations.Internal;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+
+/**
+ * <b><i>For internal use. No backwards compatibility guarantees.</i></b>
+ *
+ * <p>A primitive value within Beam.
+ */
+@Internal
+public class PValues {
+
+  // Do not instantiate
+  private PValues() {}
+
+  // For backwards-compatibility, PCollectionView is still a "PValue" to 
users, which occurs in
+  // three places:
+  //
+  //    POutput#expand (users can write custom POutputs)
+  //    PInput#expand (users can write custom PInputs)
+  //    PTransform#getAdditionalInputs (users can have their composites report 
inputs not passed by
+  // apply())
+  //
+  // These all return Map<TupleTag<?> PValue>. A user's implementation of 
these methods is permitted
+  // to return
+  // either a PCollection or a PCollectionView for each PValue. PCollection's 
expand to themselves
+  // and
+  // PCollectionView expands to the PCollection that it is a view of.
+  public static Map<TupleTag<?>, PCollection<?>> fullyExpand(
+      Map<TupleTag<?>, PValue> partiallyExpanded) {
+    Map<TupleTag<?>, PCollection<?>> result = new LinkedHashMap<>();
+    for (Map.Entry<TupleTag<?>, PValue> pvalue : partiallyExpanded.entrySet()) 
{
+      if (pvalue.getValue() instanceof PCollection) {
+        PCollection<?> previous = result.put(pvalue.getKey(), (PCollection<?>) 
pvalue.getValue());
+        checkArgument(
+            previous == null,
+            "Found conflicting %ss in flattened expansion of %s: %s maps to %s 
and %s",
+            partiallyExpanded,
+            TupleTag.class.getSimpleName(),
+            pvalue.getKey(),
+            previous,
+            pvalue.getValue());
+      } else {
+        if (pvalue.getValue().expand().size() == 1
+            && Iterables.getOnlyElement(pvalue.getValue().expand().values())
+                .equals(pvalue.getValue())) {
+          throw new IllegalStateException(
+              String.format(
+                  "Non %s %s that expands into itself %s",
+                  PCollection.class.getSimpleName(),
+                  PValue.class.getSimpleName(),
+                  pvalue.getValue()));
+        }
+        // At this point we know it is a PCollectionView or some internal 
hacked PValue. To be

Review comment:
       nit: if you use `/* */` blocks they will get line formatted correctly

##########
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PTransformReplacements.java
##########
@@ -15,20 +15,22 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.runners.core.construction;
+package org.apache.beam.sdk.runners;
 
 import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
 
 import java.util.Map;
 import java.util.Set;
-import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TupleTag;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
 
 /** */
+@Internal

Review comment:
       Why did this (and related classes) have to move?
   I could be convinced otherwise to keep the move but it looks like long term 
we would want to get rid of Pipeline#replaceAll some day since runners should 
only doing proto -> proto conversions.
   
   The only place I could find it in was PipelineTest. Can we instead move that 
test to somewhere in runners core construction?
   

##########
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
##########
@@ -1403,4 +1427,55 @@ public int size() {
       };
     }
   }
+
+  public static <InputT, ViewT> PCollectionView<ViewT> findPCollectionView(

Review comment:
       Can we choose not to expose this as it looks like it is only used within 
PipelineTest.java
   
   If there is a future need we can move it again (in hopefully a much smaller 
change) so its easier to reason about visibility.

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValues.java
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.values;

Review comment:
       This might make more sense in `org.apache.beam.sdk.runners`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to