kennknowles commented on code in PR #31490:
URL: https://github.com/apache/beam/pull/31490#discussion_r1633336029


##########
model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto:
##########
@@ -327,6 +327,11 @@ message StandardPTransforms {
     //
     // Payload: none
     TO_STRING = 8 [(beam_urn) = "beam:transform:to_string:v1"];
+
+    // Specialized implementation of GroupByKey for translating Redistribute 
transform into
+    // Dataflow service protos.
+    DATAFLOW_GROUP_BY_KEY = 9

Review Comment:
   I don't think you need this, because you should not have this transform in a 
portable graph, only in the v1beta3 (which is a pure Java override straight to 
v1beta3 and exists without the proto)



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DataflowGroupByKey.java:
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollection.IsBounded;
+import org.apache.beam.sdk.values.WindowingStrategy;
+
+/**
+ * Specialized implementation of {@code GroupByKey} for translating 
Redistribute transform into
+ * Dataflow service protos.
+ */
+public class DataflowGroupByKey<K, V>
+    extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> 
{
+
+  // Plumbed from Redistribute transform.
+  private final boolean allowDuplicates;
+
+  private DataflowGroupByKey(boolean allowDuplicates) {
+    this.allowDuplicates = allowDuplicates;
+  }
+
+  /**
+   * Returns a {@code DataflowGroupByKey<K, V>} {@code PTransform}.
+   *
+   * @param <K> the type of the keys of the input and output {@code 
PCollection}s
+   * @param <V> the type of the values of the input {@code PCollection} and 
the elements of the
+   *     {@code Iterable}s in the output {@code PCollection}
+   */
+  public static <K, V> DataflowGroupByKey<K, V> create() {
+    return new DataflowGroupByKey<>(false);
+  }
+
+  /**
+   * Returns a {@code DataflowGroupByKey<K, V>} {@code PTransform} that its 
output can have
+   * duplicated elements.
+   *
+   * @param <K> the type of the keys of the input and output {@code 
PCollection}s
+   * @param <V> the type of the values of the input {@code PCollection} and 
the elements of the
+   *     {@code Iterable}s in the output {@code PCollection}
+   */
+  public static <K, V> DataflowGroupByKey<K, V> createWithAllowDuplicates() {
+    return new DataflowGroupByKey<>(true);
+  }
+
+  /** Returns whether it allows duplicated elements in the output. */
+  public boolean allowDuplicates() {
+    return allowDuplicates;
+  }
+
+  /////////////////////////////////////////////////////////////////////////////
+
+  public static void applicableTo(PCollection<?> input) {
+    WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
+    // Verify that the input PCollection is bounded, or that there is 
windowing/triggering being
+    // used. Without this, the watermark (at end of global window) will never 
be reached.
+    if (windowingStrategy.getWindowFn() instanceof GlobalWindows
+        && windowingStrategy.getTrigger() instanceof DefaultTrigger
+        && input.isBounded() != IsBounded.BOUNDED) {
+      throw new IllegalStateException(
+          "DataflowGroupByKey cannot be applied to non-bounded PCollection in 
the GlobalWindow"
+              + " without a trigger. Use a Window.into or Window.triggering 
transform prior to"
+              + " DataflowGroupByKey.");
+    }
+  }
+
+  public WindowingStrategy<?, ?> updateWindowingStrategy(WindowingStrategy<?, 
?> inputStrategy) {
+    // If the WindowFn was merging, set the bit to indicate it is already 
merged.
+    // Switch to the continuation trigger associated with the current trigger.
+    return inputStrategy
+        .withAlreadyMerged(!inputStrategy.getWindowFn().isNonMerging())
+        .withTrigger(inputStrategy.getTrigger().getContinuationTrigger());
+  }
+
+  @Override
+  public PCollection<KV<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
+    applicableTo(input);
+
+    // Verify that the input Coder<KV<K, V>> is a KvCoder<K, V>, and that
+    // the key coder is deterministic.
+    Coder<K> keyCoder = getKeyCoder(input.getCoder());
+    try {
+      keyCoder.verifyDeterministic();
+    } catch (NonDeterministicException e) {
+      throw new IllegalStateException(
+          "the keyCoder of a DataflowGroupByKey must be deterministic", e);
+    }
+
+    // This primitive operation groups by the combination of key and window,
+    // merging windows as needed, using the windows assigned to the
+    // key/value input elements and the window merge operation of the
+    // window function associated with the input PCollection.
+    return PCollection.createPrimitiveOutputInternal(
+        input.getPipeline(),
+        updateWindowingStrategy(input.getWindowingStrategy()),
+        input.isBounded(),
+        getOutputKvCoder(input.getCoder()));
+  }
+
+  /**
+   * Returns the {@code Coder} of the input to this transform, which should be 
a {@code KvCoder}.
+   */
+  @SuppressWarnings("unchecked")
+  static <K, V> KvCoder<K, V> getInputKvCoder(Coder<?> inputCoder) {
+    if (!(inputCoder instanceof KvCoder)) {
+      throw new IllegalStateException("DataflowGroupByKey requires its input 
to use KvCoder");
+    }
+    return (KvCoder<K, V>) inputCoder;
+  }
+
+  /////////////////////////////////////////////////////////////////////////////
+
+  /**
+   * Returns the {@code Coder} of the keys of the input to this transform, 
which is also used as the
+   * {@code Coder} of the keys of the output of this transform.
+   */
+  public static <K, V> Coder<K> getKeyCoder(Coder<KV<K, V>> inputCoder) {

Review Comment:
   These don't need to be public, right?



##########
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/RedistributeByKeyOverrideFactory.java:
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow;
+
+import java.util.Collections;
+import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.runners.PTransformOverrideFactory;
+import 
org.apache.beam.sdk.runners.PTransformOverrideFactory.PTransformReplacement;
+import org.apache.beam.sdk.transforms.DataflowGroupByKey;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.Element;
+import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
+import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Redistribute.RedistributeByKey;
+import org.apache.beam.sdk.transforms.Reify;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.ReshuffleTrigger;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.IdentityWindowFn;
+import org.apache.beam.sdk.util.construction.PTransformReplacements;
+import org.apache.beam.sdk.util.construction.SingleInputOutputOverrideFactory;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.ValueInSingleWindow;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.joda.time.Duration;
+
+@SuppressWarnings({
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)

Review Comment:
   What are the incompatible types? Basically, treat nullness the same way you 
would treat any other type error. You can suppress at a very fine local level 
if absolutely necessary. But in this case I think you just need to add a null 
check if there is a nullable value probably.



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/PTransformTranslation.java:
##########
@@ -177,6 +179,9 @@ public class PTransformTranslation {
     
checkState(FLATTEN_TRANSFORM_URN.equals(getUrn(StandardPTransforms.Primitives.FLATTEN)));
     checkState(
         
GROUP_BY_KEY_TRANSFORM_URN.equals(getUrn(StandardPTransforms.Primitives.GROUP_BY_KEY)));
+    checkState(

Review Comment:
   I don't think we need this test, since this transform should never show up 
in a portable graph. Removing the URN from the proto is a good way to protect 
us from mistakes, I think.



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/DataflowGroupByKeyTranslation.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util.construction;
+
+import com.google.auto.service.AutoService;
+import java.util.Collections;
+import java.util.Map;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
+import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.transforms.DataflowGroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+
+/**
+ * Utility methods for translating a {@link DataflowGroupByKey} to and from 
{@link RunnerApi}
+ * representations.
+ */
+@SuppressWarnings({
+  "nullness", // TODO(https://github.com/apache/beam/issues/20497)

Review Comment:
   Going to have to get them right. You can add null checks where you need 
them. This isn't an optional warning; it is a type system that protects from 
errors, including helping to enforce good code approaches. (most coding that 
has a problem with null checks can be replaced by approaches that have fewer 
problems)
   
   Treat it the same as any other type error.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to