This is an automated email from the ASF dual-hosted git repository.

jkff pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit c6154382263a68d7eca893c7da3617d177e4c1df
Author: Eugene Kirpichov <kirpic...@google.com>
AuthorDate: Wed Nov 15 20:19:09 2017 -0800

    Unifies windowed and unwindowed finalize.
---
 .../java/org/apache/beam/sdk/io/WriteFiles.java    | 232 ++++++++-------------
 .../java/org/apache/beam/sdk/transforms/Reify.java |  73 ++++++-
 .../apache/beam/sdk/values/TypeDescriptors.java    |   4 +
 3 files changed, 163 insertions(+), 146 deletions(-)

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
index 9cfabfe..87459e9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
@@ -42,11 +42,11 @@ import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
+import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.ShardedKeyCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.FileBasedSink.FileResult;
 import org.apache.beam.sdk.io.FileBasedSink.FileResultCoder;
 import org.apache.beam.sdk.io.FileBasedSink.WriteOperation;
@@ -55,14 +55,13 @@ import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
-import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Reify;
 import org.apache.beam.sdk.transforms.Reshuffle;
-import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.Values;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.WithKeys;
@@ -653,6 +652,9 @@ public class WriteFiles<UserT, DestinationT, OutputT>
                   .discardingFiredPanes());
     }
 
+    final FileBasedSink.DynamicDestinations<?, DestinationT, OutputT> 
destinations =
+        writeOperation.getSink().getDynamicDestinations();
+
     // Perform the per-bundle writes as a ParDo on the input PCollection (with 
the
     // WriteOperation as a side input) and collect the results of the writes 
in a
     // PCollection. There is a dependency between this ParDo and the first (the
@@ -663,19 +665,6 @@ public class WriteFiles<UserT, DestinationT, OutputT>
     List<PCollectionView<Integer>> shardingSideInputs = numShardsView == null
         ? ImmutableList.<PCollectionView<Integer>>of()
         : ImmutableList.of(numShardsView);
-    SerializableFunction<DoFn.ProcessContext, Integer> getFixedNumShards =
-        new SerializableFunction<DoFn.ProcessContext, Integer>() {
-          @Override
-          public Integer apply(DoFn<?, ?>.ProcessContext c) {
-            if (numShardsView != null) {
-              return c.sideInput(numShardsView);
-            } else if (numShardsProvider != null) {
-              return numShardsProvider.get();
-            } else {
-              return null;
-            }
-          }
-        };
 
     @SuppressWarnings("unchecked")
     Coder<BoundedWindow> shardedWindowCoder =
@@ -683,13 +672,12 @@ public class WriteFiles<UserT, DestinationT, OutputT>
     final Coder<DestinationT> destinationCoder;
     try {
       destinationCoder =
-          sink.getDynamicDestinations()
-              
.getDestinationCoderWithDefault(input.getPipeline().getCoderRegistry());
+          
destinations.getDestinationCoderWithDefault(input.getPipeline().getCoderRegistry());
       destinationCoder.verifyDeterministic();
     } catch (CannotProvideCoderException | NonDeterministicException e) {
       throw new RuntimeException(e);
     }
-    FileResultCoder<DestinationT> fileResultCoder =
+    final FileResultCoder<DestinationT> fileResultCoder =
         FileResultCoder.of(shardedWindowCoder, destinationCoder);
 
     PCollection<FileResult<DestinationT>> results;
@@ -749,155 +737,109 @@ public class WriteFiles<UserT, DestinationT, OutputT>
     }
     results.setCoder(fileResultCoder);
 
-    PCollection<KV<DestinationT, String>> outputFilenames;
+    PCollection<Iterable<FileResult<DestinationT>>> fileResultBundles;
     if (windowedWrites) {
-      // We need to materialize the FileResult's before the renaming stage: 
this can be done either
-      // via a side input or via a GBK. However, when processing streaming 
windowed writes, results
-      // will arrive multiple times. This means we can't share the below 
implementation that turns
-      // the results into a side input, as new data arriving into a side input 
does not trigger the
-      // listening DoFn. We also can't use a GBK because we need only the 
materialization, but not
-      // the (potentially lossy, if the user's trigger is lossy) continuation 
triggering that GBK
-      // would give. So, we use a reshuffle (over a single key to maximize 
bundling).
-      outputFilenames =
-          results
-              .apply(WithKeys.<Void, FileResult<DestinationT>>of((Void) null))
-              .setCoder(KvCoder.of(VoidCoder.of(), results.getCoder()))
-              .apply("Reshuffle", Reshuffle.<Void, 
FileResult<DestinationT>>of())
-              .apply(Values.<FileResult<DestinationT>>create())
+      // Reshuffle the results to make them stable against retries.
+      // Use a single void key to maximize size of bundles for finalization.
+      PCollection<FileResult<DestinationT>> stableResults = results
+          .apply("Add void key", WithKeys.<Void, 
FileResult<DestinationT>>of((Void) null))
+          .apply("Reshuffle", Reshuffle.<Void, FileResult<DestinationT>>of())
+          .apply("Drop key", Values.<FileResult<DestinationT>>create());
+      fileResultBundles =
+          stableResults
               .apply(
-                  "FinalizeWindowed",
-                  ParDo.of(new FinalizeWindowedFn<>(getFixedNumShards, 
writeOperation))
-                      .withSideInputs(shardingSideInputs))
-              .setCoder(KvCoder.of(destinationCoder, StringUtf8Coder.of()));
+                  "Gather bundles",
+                  ParDo.of(new 
GatherBundlesPerWindowFn<FileResult<DestinationT>>()))
+              .setCoder(IterableCoder.of(fileResultCoder));
     } else {
-      PCollectionView<Iterable<FileResult<DestinationT>>> resultsView =
-          results.apply(View.<FileResult<DestinationT>>asIterable());
-
-      // Finalize the write in another do-once ParDo on the singleton 
collection containing the
-      // Writer. The results from the per-bundle writes are given as an 
Iterable side input.
-      // The WriteOperation's state is the same as after its initialization in 
the first
-      // do-once ParDo. There is a dependency between this ParDo and the 
parallel write (the writer
-      // results collection as a side input), so it will happen after the 
parallel write.
-      // For the non-windowed case, we guarantee that  if no data is written 
but the user has
-      // set numShards, then all shards will be written out as empty files. 
For this reason we
-      // use a side input here.
-      outputFilenames =
-          p.apply(Create.of((Void) null))
-              .apply(
-                  "FinalizeUnwindowed",
-                  ParDo.of(
-                          new FinalizeUnwindowedFn<>(
-                              getFixedNumShards, resultsView, writeOperation))
-                      .withSideInputs(
-                          FluentIterable.concat(sideInputs, shardingSideInputs)
-                              .append(resultsView)
-                              .toList()))
-              .setCoder(KvCoder.of(destinationCoder, StringUtf8Coder.of()));
+      // Pass results via a side input rather than reshuffle, because we need 
to get an empty
+      // iterable to finalize if there are no results.
+      fileResultBundles =
+          p.apply(
+              Reify.viewInGlobalWindow(
+                  results.apply(View.<FileResult<DestinationT>>asIterable()),
+                  IterableCoder.of(fileResultCoder)));
     }
 
+    class FinalizeFn extends DoFn<Iterable<FileResult<DestinationT>>, 
KV<DestinationT, String>> {
+      @ProcessElement
+      public void process(ProcessContext c) throws Exception {
+        
writeOperation.getSink().getDynamicDestinations().setSideInputAccessorFromProcessContext(c);
+        @Nullable Integer fixedNumShards;
+        if (numShardsView != null) {
+          fixedNumShards = c.sideInput(numShardsView);
+        } else if (numShardsProvider != null) {
+          fixedNumShards = numShardsProvider.get();
+        } else {
+          checkState(!windowedWrites, "Windowed write should have set fixed 
sharding");
+          fixedNumShards = null;
+        }
+        List<FileResult<DestinationT>> fileResults = 
Lists.newArrayList(c.element());
+        LOG.info("Finalizing {} file results", fileResults.size());
+        DestinationT defaultDest = destinations.getDefaultDestination();
+        List<KV<FileResult<DestinationT>, ResourceId>> resultsToFinalFilenames 
=
+            fileResults.isEmpty()
+                ? writeOperation.finalizeDestination(
+                defaultDest, GlobalWindow.INSTANCE, fixedNumShards, 
fileResults)
+                : finalizeAllDestinations(fileResults, fixedNumShards);
+        for (KV<FileResult<DestinationT>, ResourceId> entry : 
resultsToFinalFilenames) {
+          FileResult<DestinationT> res = entry.getKey();
+          c.output(KV.of(res.getDestination(), entry.getValue().toString()));
+        }
+        writeOperation.moveToOutputFiles(resultsToFinalFilenames);
+      }
+    }
+
+    List<PCollectionView<?>> sideInputs =
+        FluentIterable.concat(this.sideInputs, shardingSideInputs).toList();
+    PCollection<KV<DestinationT, String>> outputFilenames =
+        fileResultBundles
+            .apply("Finalize", ParDo.of(new 
FinalizeFn()).withSideInputs(sideInputs))
+            .setCoder(KvCoder.of(destinationCoder, StringUtf8Coder.of()));
+
     TupleTag<KV<DestinationT, String>> perDestinationOutputFilenamesTag =
         new TupleTag<>("perDestinationOutputFilenames");
     return WriteFilesResult.in(
         input.getPipeline(), perDestinationOutputFilenamesTag, 
outputFilenames);
   }
 
-  private static class FinalizeWindowedFn<DestinationT>
-      extends DoFn<FileResult<DestinationT>, KV<DestinationT, String>> {
-    private final SerializableFunction<DoFn.ProcessContext, Integer> 
getFixedNumShards;
-    private final WriteOperation<DestinationT, ?> writeOperation;
-
-    @Nullable private transient List<FileResult<DestinationT>> fileResults;
-    @Nullable private Integer fixedNumShards;
-
-    public FinalizeWindowedFn(
-        SerializableFunction<DoFn.ProcessContext, Integer> getFixedNumShards,
-        WriteOperation<DestinationT, ?> writeOperation) {
-      this.getFixedNumShards = getFixedNumShards;
-      this.writeOperation = writeOperation;
+  private List<KV<FileResult<DestinationT>, ResourceId>> 
finalizeAllDestinations(
+      List<FileResult<DestinationT>> fileResults, @Nullable Integer 
fixedNumShards)
+      throws Exception {
+    Multimap<KV<DestinationT, BoundedWindow>, FileResult<DestinationT>> res =
+        ArrayListMultimap.create();
+    for (FileResult<DestinationT> result : fileResults) {
+      res.put(KV.of(result.getDestination(), result.getWindow()), result);
+    }
+    List<KV<FileResult<DestinationT>, ResourceId>> resultsToFinalFilenames = 
Lists.newArrayList();
+    for (Map.Entry<KV<DestinationT, BoundedWindow>, 
Collection<FileResult<DestinationT>>>
+        destEntry : res.asMap().entrySet()) {
+      KV<DestinationT, BoundedWindow> destWindow = destEntry.getKey();
+      resultsToFinalFilenames.addAll(
+          writeOperation.finalizeDestination(
+              destWindow.getKey(), destWindow.getValue(), fixedNumShards, 
destEntry.getValue()));
     }
+    return resultsToFinalFilenames;
+  }
+
+  private static class GatherBundlesPerWindowFn<T> extends DoFn<T, 
Iterable<T>> {
+    @Nullable private transient Multimap<BoundedWindow, T> bundles = null;
 
     @StartBundle
     public void startBundle() {
-      fileResults = Lists.newArrayList();
-      fixedNumShards = null;
+      bundles = ArrayListMultimap.create();
     }
 
     @ProcessElement
-    public void processElement(ProcessContext c) {
-      fileResults.add(c.element());
-      if (fixedNumShards == null) {
-        fixedNumShards = getFixedNumShards.apply(c);
-        checkState(fixedNumShards != null, "Windowed write should have set 
fixed sharding");
-      }
+    public void process(ProcessContext c, BoundedWindow w) {
+      bundles.put(w, c.element());
     }
 
     @FinishBundle
     public void finishBundle(FinishBundleContext c) throws Exception {
-      List<KV<FileResult<DestinationT>, ResourceId>> resultsToFinalFilenames =
-          finalizeAllDestinations(writeOperation, fileResults, fixedNumShards);
-      for (KV<FileResult<DestinationT>, ResourceId> entry : 
resultsToFinalFilenames) {
-        FileResult<DestinationT> res = entry.getKey();
-        c.output(
-            KV.of(res.getDestination(), entry.getValue().toString()),
-            res.getWindow().maxTimestamp(),
-            res.getWindow());
+      for (BoundedWindow w : bundles.keySet()) {
+        c.output(Lists.newArrayList(bundles.get(w)), w.maxTimestamp(), w);
       }
-      writeOperation.moveToOutputFiles(resultsToFinalFilenames);
     }
   }
-
-  private static class FinalizeUnwindowedFn<DestinationT>
-      extends DoFn<Void, KV<DestinationT, String>> {
-    private final SerializableFunction<DoFn.ProcessContext, Integer> 
getFixedNumShards;
-    private final PCollectionView<Iterable<FileResult<DestinationT>>> 
resultsView;
-    private final WriteOperation<DestinationT, ?> writeOperation;
-
-    public FinalizeUnwindowedFn(
-        SerializableFunction<DoFn.ProcessContext, Integer> getFixedNumShards,
-        PCollectionView<Iterable<FileResult<DestinationT>>> resultsView,
-        WriteOperation<DestinationT, ?> writeOperation) {
-      this.getFixedNumShards = getFixedNumShards;
-      this.resultsView = resultsView;
-      this.writeOperation = writeOperation;
-    }
-
-    @ProcessElement
-    public void processElement(ProcessContext c) throws Exception {
-      
writeOperation.getSink().getDynamicDestinations().setSideInputAccessorFromProcessContext(c);
-      List<FileResult<DestinationT>> fileResults = 
Lists.newArrayList(c.sideInput(resultsView));
-      List<KV<FileResult<DestinationT>, ResourceId>> resultsToFinalFilenames =
-          fileResults.isEmpty()
-              ? writeOperation.finalizeDestination(
-                  
writeOperation.getSink().getDynamicDestinations().getDefaultDestination(),
-                  GlobalWindow.INSTANCE,
-                  getFixedNumShards.apply(c),
-                  ImmutableList.<FileResult<DestinationT>>of())
-              : finalizeAllDestinations(writeOperation, fileResults, 
getFixedNumShards.apply(c));
-      for (KV<FileResult<DestinationT>, ResourceId> entry : 
resultsToFinalFilenames) {
-        c.output(KV.of(entry.getKey().getDestination(), 
entry.getValue().toString()));
-      }
-      writeOperation.moveToOutputFiles(resultsToFinalFilenames);
-    }
-  }
-
-  private static <DestinationT>
-      List<KV<FileResult<DestinationT>, ResourceId>> finalizeAllDestinations(
-          WriteOperation<DestinationT, ?> writeOperation,
-          List<FileResult<DestinationT>> fileResults,
-          Integer fixedNumShards)
-          throws Exception {
-    List<KV<FileResult<DestinationT>, ResourceId>> resultsToFinalFilenames = 
Lists.newArrayList();
-    Multimap<KV<DestinationT, BoundedWindow>, FileResult<DestinationT>> 
resultsByDestMultimap =
-        groupByDestinationAndWindow(fileResults);
-    for (Map.Entry<KV<DestinationT, BoundedWindow>, 
Collection<FileResult<DestinationT>>>
-        destEntry : resultsByDestMultimap.asMap().entrySet()) {
-      resultsToFinalFilenames.addAll(
-          writeOperation.finalizeDestination(
-              destEntry.getKey().getKey(),
-              destEntry.getKey().getValue(),
-              fixedNumShards,
-              destEntry.getValue()));
-    }
-    return resultsToFinalFilenames;
-  }
 }
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reify.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reify.java
index caa89e6..7f5c881 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reify.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reify.java
@@ -18,17 +18,69 @@
 
 package org.apache.beam.sdk.transforms;
 
+import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TimestampedValue;
 import org.apache.beam.sdk.values.TimestampedValue.TimestampedValueCoder;
 import org.apache.beam.sdk.values.ValueInSingleWindow;
 import org.joda.time.Duration;
 
-/** {@link PTransform PTransforms} for reifying the timestamp, window and pane 
of values. */
+/**
+ * {@link PTransform PTransforms} for converting between explicit and implicit 
form of various Beam
+ * values.
+ */
 public class Reify {
+  private static class ReifyView<K, V>
+  extends PTransform<PCollection<K>, PCollection<KV<K, V>>> {
+    private final PCollectionView<V> view;
+    private final Coder<V> coder;
+
+    private ReifyView(PCollectionView<V> view, Coder<V> coder) {
+      this.view = view;
+      this.coder = coder;
+    }
+
+    @Override
+    public PCollection<KV<K, V>> expand(PCollection<K> input) {
+      return input
+          .apply(
+              ParDo.of(
+                      new DoFn<K, KV<K, V>>() {
+                        @ProcessElement
+                        public void process(ProcessContext c) {
+                          c.output(KV.of(c.element(), c.sideInput(view)));
+                        }
+                      })
+                  .withSideInputs(view))
+          .setCoder(KvCoder.of(input.getCoder(), coder));
+    }
+  }
+
+  private static class ReifyViewInGlobalWindow<V>
+  extends PTransform<PBegin, PCollection<V>> {
+    private final PCollectionView<V> view;
+    private final Coder<V> coder;
+
+    private ReifyViewInGlobalWindow(PCollectionView<V> view, Coder<V> coder) {
+      this.view = view;
+      this.coder = coder;
+    }
+
+    @Override
+    public PCollection<V> expand(PBegin input) {
+      return input
+          .apply(Create.of((Void) null).withCoder(VoidCoder.of()))
+          .apply(Reify.<Void, V>viewAsValues(view, coder))
+          .apply(Values.<V>create());
+    }
+  }
+
   /** Private implementation of {@link #windows()}. */
   private static class Window<T>
       extends PTransform<PCollection<T>, PCollection<ValueInSingleWindow<T>>> {
@@ -184,9 +236,28 @@ public class Reify {
     return new WindowInValue<>();
   }
 
+  /** Extracts the timestamps from each value in a {@link KV}. */
   public static <K, V>
       PTransform<PCollection<KV<K, TimestampedValue<V>>>, PCollection<KV<K, 
V>>>
           extractTimestampsFromValues() {
     return new ExtractTimestampsFromValues<>();
   }
+
+  /**
+   * Pairs each element in a collection with the value of a side input 
associated with the element's
+   * window.
+   */
+  public static <K, V> PTransform<PCollection<K>, PCollection<KV<K, V>>> 
viewAsValues(
+      PCollectionView<V> view, Coder<V> coder) {
+    return new ReifyView<>(view, coder);
+  }
+
+  /**
+   * Returns a {@link PCollection} consisting of a single element, containing 
the value of the given
+   * view in the global window.
+   */
+  public static <K, V> PTransform<PBegin, PCollection<V>> viewInGlobalWindow(
+      PCollectionView<V> view, Coder<V> coder) {
+    return new ReifyViewInGlobalWindow<>(view, coder);
+  }
 }
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptors.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptors.java
index e59f84b..8ef2a4d 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptors.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptors.java
@@ -291,6 +291,10 @@ public class TypeDescriptors {
     return typeDescriptor;
   }
 
+  public static TypeDescriptor<Void> voids() {
+    return new TypeDescriptor<Void>() {};
+  }
+
   /**
    * A helper interface for use with {@link #extractFromTypeParameters(Object, 
Class,
    * TypeVariableExtractor)}.

-- 
To stop receiving notification emails like this one, please contact
"commits@beam.apache.org" <commits@beam.apache.org>.

Reply via email to