robertwb commented on a change in pull request #15665:
URL: https://github.com/apache/beam/pull/15665#discussion_r725349281
##########
File path:
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java
##########
@@ -36,102 +36,121 @@
/**
* <b>For internal use only; no backwards compatibility guarantees.</b>
*
- * <p>A {@link PTransform} that returns a {@link PCollection} equivalent to
its input but
+ * <p>{@link PTransform}(s) that returns a {@link PCollection} equivalent to
its input but
* operationally provides some of the side effects of a {@link GroupByKey}, in
particular
* checkpointing, and preventing fusion of the surrounding transforms.
*
* <p>Performs a {@link GroupByKey} so that the data is key-partitioned.
Configures the {@link
* WindowingStrategy} so that no data is dropped, but doesn't affect the need
for the user to
* specify allowed lateness and accumulation mode before a user-inserted
GroupByKey.
- *
- * @param <K> The type of key being reshuffled on.
- * @param <V> The type of value being reshuffled.
- * @deprecated this transform's intended side effects are not portable; it
will likely be removed
*/
@Internal
-@Deprecated
-public class Reshuffle<K, V> extends PTransform<PCollection<KV<K, V>>,
PCollection<KV<K, V>>> {
+public class Reshuffle {
private Reshuffle() {}
- public static <K, V> Reshuffle<K, V> of() {
- return new Reshuffle<>();
+ /** @deprecated use {@link #perKey()}. */
+ @Deprecated
+ public static <K, V> PerKey<K, V> of() {
+ return new PerKey<>();
+ }
+
+ public static <K, V> PerKey<K, V> perKey() {
+ return new PerKey<>();
+ }
+
+ /** @deprecated use {@link #perRandomKey()}. */
+ @Deprecated
+ public static <T> PerRandomKey<T> viaRandomKey() {
+ return perRandomKey();
}
/**
- * Encapsulates the sequence "pair input with unique key, apply {@link
Reshuffle#of}, drop the
- * key" commonly used to break fusion.
+ * Encapsulates the sequence "pair input with unique key, apply {@link
Reshuffle#perKey()}, drop
+ * the key" commonly used to break fusion.
*/
@Experimental
- public static <T> ViaRandomKey<T> viaRandomKey() {
- return new ViaRandomKey<>();
- }
-
- @Override
- public PCollection<KV<K, V>> expand(PCollection<KV<K, V>> input) {
- WindowingStrategy<?, ?> originalStrategy = input.getWindowingStrategy();
- // If the input has already had its windows merged, then the GBK that
performed the merge
- // will have set originalStrategy.getWindowFn() to InvalidWindows, causing
the GBK contained
- // here to fail. Instead, we install a valid WindowFn that leaves all
windows unchanged.
- // The TimestampCombiner is set to ensure the GroupByKey does not shift
elements forwards in
- // time.
- // Because this outputs as fast as possible, this should not hold the
watermark.
- Window<KV<K, V>> rewindow =
- Window.<KV<K, V>>into(new
IdentityWindowFn<>(originalStrategy.getWindowFn().windowCoder()))
- .triggering(new ReshuffleTrigger<>())
- .discardingFiredPanes()
- .withTimestampCombiner(TimestampCombiner.EARLIEST)
-
.withAllowedLateness(Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()));
-
- return input
- .apply(rewindow)
- .apply("ReifyOriginalTimestamps", Reify.timestampsInValue())
- .apply(GroupByKey.create())
- // Set the windowing strategy directly, so that it doesn't get counted
as the user having
- // set allowed lateness.
- .setWindowingStrategyInternal(originalStrategy)
- .apply(
- "ExpandIterable",
- ParDo.of(
- new DoFn<KV<K, Iterable<TimestampedValue<V>>>, KV<K,
TimestampedValue<V>>>() {
- @ProcessElement
- public void processElement(
- @Element KV<K, Iterable<TimestampedValue<V>>> element,
- OutputReceiver<KV<K, TimestampedValue<V>>> r) {
- K key = element.getKey();
- for (TimestampedValue<V> value : element.getValue()) {
- r.output(KV.of(key, value));
- }
- }
- }))
- .apply("RestoreOriginalTimestamps",
ReifyTimestamps.extractFromValues());
+ public static <T> PerRandomKey<T> perRandomKey() {
+ return new PerRandomKey<>();
}
/** Implementation of {@link #viaRandomKey()}. */
- public static class ViaRandomKey<T> extends PTransform<PCollection<T>,
PCollection<T>> {
- private ViaRandomKey() {}
+ public static class PerRandomKey<T> extends PTransform<PCollection<T>,
PCollection<T>> {
+ private PerRandomKey() {}
- private ViaRandomKey(@Nullable Integer numBuckets) {
+ private PerRandomKey(@Nullable Integer numBuckets) {
this.numBuckets = numBuckets;
}
// The number of buckets to shard into. This is a performance optimization
to prevent having
// unit sized bundles on the output. If unset, uses a random integer key.
private @Nullable Integer numBuckets;
- public ViaRandomKey<T> withNumBuckets(@Nullable Integer numBuckets) {
- return new ViaRandomKey<>(numBuckets);
+ public PerRandomKey<T> withNumBuckets(@Nullable Integer numBuckets) {
+ return new PerRandomKey<>(numBuckets);
}
@Override
public PCollection<T> expand(PCollection<T> input) {
return input
.apply("Pair with random key", ParDo.of(new
AssignShardFn<>(numBuckets)))
- .apply(Reshuffle.of())
+ .apply(Reshuffle.perKey())
.apply(Values.create());
}
}
+ /**
+ * Implementation of {@link #perKey*()}.
+ *
+ * @param <K> The type of key being reshuffled on.
+ * @param <V> The type of value being reshuffled.
+ */
+ public static class PerKey<K, V>
Review comment:
Dataflow and Flink pipelines support updating/resuming from a
checkpoint. In order to line up the old stages with the new, the stage names
are used (at least for stateful operations, which GBK/shuffling is). Yeah, it's
a pain, and there are some thoughts around making things better, but that's
what we have to do for now.
##########
File path: model/pipeline/src/main/proto/beam_runner_api.proto
##########
@@ -365,6 +365,28 @@ message StandardPTransforms {
// Represents the GroupIntoBatches.WithShardedKey operation.
// Payload: GroupIntoBatchesPayload
GROUP_INTO_BATCHES_WITH_SHARDED_KEY = 6 [(beam_urn) =
"beam:transform:group_into_batches_with_sharded_key:v1"];
+
+ // Represents Reshuffle.perKey() operation, a PTransform that returns a
PCollection
+ // equivalent to its input and ideally redistributed among the available
workers
+ // with all data of a given key going to the same worker.
+ // The output of this operation, including windowing operation should be
identical
Review comment:
including windowing operation -> including windowing metadata,
##########
File path:
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
##########
@@ -159,6 +159,8 @@
new CreateViewStreamingTranslator());
TRANSLATORS.put(PTransformTranslation.RESHUFFLE_URN, new
ReshuffleTranslatorStreaming());
Review comment:
There's no requirement that the SDK and Runner versions by updated in
sync, so we may have to keep this around for a while. Not the end of the world
though.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]