kennknowles commented on a change in pull request #13118:
URL: https://github.com/apache/beam/pull/13118#discussion_r509442928



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValues.java
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.values;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+import org.apache.beam.sdk.annotations.Internal;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+
+/**
+ * <b><i>For internal use. No backwards compatibility guarantees.</i></b>
+ *
+ * <p>A primitive value within Beam.
+ */
+@Internal
+public class PValues {
+
+  // Do not instantiate
+  private PValues() {}
+
+  // For backwards-compatibility, PCollectionView is still a "PValue" to 
users, which occurs in

Review comment:
       Done

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValues.java
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.values;

Review comment:
       I put it here because it is a 
[companion](https://docs.scala-lang.org/overviews/scala-book/companion-objects.html)
 to `PValue`. It is conventional for that to be the plural name, in the same 
package. If you still want me to move it after this explanation, I will. I 
don't care enough to block on it.

##########
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PTransformReplacements.java
##########
@@ -15,20 +15,22 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.runners.core.construction;
+package org.apache.beam.sdk.runners;
 
 import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
 
 import java.util.Map;
 import java.util.Set;
-import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TupleTag;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
 
 /** */
+@Internal

Review comment:
       Resolved: I am removing the moves from the PR to avoid getting blocked 
on anything that could reopen discussions.

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValues.java
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.values;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+import org.apache.beam.sdk.annotations.Internal;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+
+/**
+ * <b><i>For internal use. No backwards compatibility guarantees.</i></b>
+ *
+ * <p>A primitive value within Beam.
+ */
+@Internal
+public class PValues {
+
+  // Do not instantiate
+  private PValues() {}
+
+  // For backwards-compatibility, PCollectionView is still a "PValue" to 
users, which occurs in
+  // three places:
+  //
+  //    POutput#expand (users can write custom POutputs)
+  //    PInput#expand (users can write custom PInputs)
+  //    PTransform#getAdditionalInputs (users can have their composites report 
inputs not passed by
+  // apply())
+  //
+  // These all return Map<TupleTag<?> PValue>. A user's implementation of 
these methods is permitted
+  // to return
+  // either a PCollection or a PCollectionView for each PValue. PCollection's 
expand to themselves
+  // and
+  // PCollectionView expands to the PCollection that it is a view of.
+  public static Map<TupleTag<?>, PCollection<?>> fullyExpand(
+      Map<TupleTag<?>, PValue> partiallyExpanded) {
+    Map<TupleTag<?>, PCollection<?>> result = new LinkedHashMap<>();
+    for (Map.Entry<TupleTag<?>, PValue> pvalue : partiallyExpanded.entrySet()) 
{
+      if (pvalue.getValue() instanceof PCollection) {
+        PCollection<?> previous = result.put(pvalue.getKey(), (PCollection<?>) 
pvalue.getValue());
+        checkArgument(
+            previous == null,
+            "Found conflicting %ss in flattened expansion of %s: %s maps to %s 
and %s",
+            partiallyExpanded,
+            TupleTag.class.getSimpleName(),
+            pvalue.getKey(),
+            previous,
+            pvalue.getValue());
+      } else {
+        if (pvalue.getValue().expand().size() == 1
+            && Iterables.getOnlyElement(pvalue.getValue().expand().values())
+                .equals(pvalue.getValue())) {
+          throw new IllegalStateException(
+              String.format(
+                  "Non %s %s that expands into itself %s",
+                  PCollection.class.getSimpleName(),
+                  PValue.class.getSimpleName(),
+                  pvalue.getValue()));
+        }
+        // At this point we know it is a PCollectionView or some internal 
hacked PValue. To be

Review comment:
       Done

##########
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
##########
@@ -1403,4 +1427,55 @@ public int size() {
       };
     }
   }
+
+  public static <InputT, ViewT> PCollectionView<ViewT> findPCollectionView(

Review comment:
       I've moved it into the test class where it is used. I would still like 
to leave the alteration of `DataflowRunner` out of this PR, since my other PR 
that just adds a `checkState` illustrates that the `DataflowRunner` batch view 
overrides result in a corrupted graph that sort of works by luck. I don't want 
to disturb that potentially sensitive situation.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to