[ 
https://issues.apache.org/jira/browse/BEAM-5445?focusedWorklogId=150410&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-150410
 ]

ASF GitHub Bot logged work on BEAM-5445:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 02/Oct/18 15:03
            Start Date: 02/Oct/18 15:03
    Worklog Time Spent: 10m 
      Work Description: nielm commented on a change in pull request #6478: 
[BEAM-5445] [BEAM-4796] SpannerIO: Only batch on the current bundle. Adds 
streaming support
URL: https://github.com/apache/beam/pull/6478#discussion_r221988893
 
 

 ##########
 File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
 ##########
 @@ -880,192 +868,191 @@ public SpannerWriteResult 
expand(PCollection<MutationGroup> input) {
               .apply("Schema View", View.asSingleton());
 
       // Split the mutations into batchable and unbatchable mutations.
-      // Filter out mutation groups too big to be batched
+      // Filter out mutation groups too big to be batched.
       PCollectionTuple filteredMutations =
-          input.apply(
-              "Filter Unbatchable Mutations",
-              ParDo.of(
-                      new BatchableMutationFilterFn(
-                          schemaView,
-                          UNBATCHABLE_MUTATIONS_TAG,
-                          spec.getBatchSizeBytes(),
-                          spec.getMaxNumMutations()))
-                  .withSideInputs(schemaView)
-                  .withOutputTags(
-                      BATCHABLE_MUTATIONS_TAG, 
TupleTagList.of(UNBATCHABLE_MUTATIONS_TAG)));
-
-      // Serialize batchable mutations, we don't need to encode/decode them 
while reshuffling.
-      // The primary key is encoded via OrderedCode so we can calculate 
quantiles.
-      PCollection<SerializedMutation> serialized =
-          filteredMutations
-              .get(BATCHABLE_MUTATIONS_TAG)
+          input
+              .apply("To Global Window", Window.into(new GlobalWindows()))
               .apply(
-                  "Serialize mutations",
-                  ParDo.of(new 
SerializeMutationsFn(schemaView)).withSideInputs(schemaView))
-              .setCoder(SerializedMutationCoder.of());
-
-      // Sample primary keys using ApproximateQuantiles.
-      PCollectionView<Map<String, List<byte[]>>> keySample =
-          serialized
-              .apply("Extract keys", ParDo.of(new ExtractKeys()))
-              .apply("Sample keys", sampler)
-              .apply("Keys sample as view", View.asMap());
-
-      TupleTag<Void> mainTag = new TupleTag<>("mainOut");
-      TupleTag<MutationGroup> failedTag = new TupleTag<>("failedMutations");
-      // Assign partition based on the closest element in the sample and group 
mutations.
-      AssignPartitionFn assignPartitionFn = new AssignPartitionFn(keySample);
+                  "Filter Unbatchable Mutations",
+                  ParDo.of(
+                          new BatchableMutationFilterFn(
+                              schemaView,
+                              UNBATCHABLE_MUTATIONS_TAG,
+                              spec.getBatchSizeBytes(),
+                              spec.getMaxNumMutations()))
+                      .withSideInputs(schemaView)
+                      .withOutputTags(
+                          BATCHABLE_MUTATIONS_TAG, 
TupleTagList.of(UNBATCHABLE_MUTATIONS_TAG)));
+
+      // Build a set of Mutation groups from the current bundle,
+      // sort them by table/key then split into batches.
       PCollection<Iterable<MutationGroup>> batchedMutations =
-          serialized
-              .apply("Partition input", 
ParDo.of(assignPartitionFn).withSideInputs(keySample))
-              .setCoder(KvCoder.of(StringUtf8Coder.of(), 
SerializedMutationCoder.of()))
-              .apply("Group by partition", GroupByKey.create())
+          filteredMutations
+              .get(BATCHABLE_MUTATIONS_TAG)
               .apply(
-                  "Batch mutations together",
+                  "Gather And Sort",
                   ParDo.of(
-                          new BatchFn(
+                          new GatherBundleAndSortFn(
                               spec.getBatchSizeBytes(),
                               spec.getMaxNumMutations(),
-                              spec.getSpannerConfig(),
+                              spec.getGroupingFactor(),
                               schemaView))
+                      .withSideInputs(schemaView))
+              .apply(
+                  "Create Batches",
+                  ParDo.of(
+                          new BatchFn(
+                              spec.getBatchSizeBytes(), 
spec.getMaxNumMutations(), schemaView))
                       .withSideInputs(schemaView));
 
       // Merge the batchable and unbatchable mutations and write to Spanner.
       PCollectionTuple result =
           PCollectionList.of(filteredMutations.get(UNBATCHABLE_MUTATIONS_TAG))
               .and(batchedMutations)
-              .apply("Flatten", Flatten.pCollections())
+              .apply("Merge", Flatten.pCollections())
               .apply(
                   "Write mutations to Spanner",
                   ParDo.of(
                           new WriteToSpannerFn(
-                              spec.getSpannerConfig(), spec.getFailureMode(), 
failedTag))
-                      .withOutputTags(mainTag, TupleTagList.of(failedTag)));
-      PCollection<MutationGroup> failedMutations = result.get(failedTag);
-      failedMutations.setCoder(SerializableCoder.of(MutationGroup.class));
+                              spec.getSpannerConfig(), spec.getFailureMode(), 
FAILED_MUTATIONS_TAG))
+                      .withOutputTags(MAIN_OUT_TAG, 
TupleTagList.of(FAILED_MUTATIONS_TAG)));
+
       return new SpannerWriteResult(
-          input.getPipeline(), result.get(mainTag), failedMutations, 
failedTag);
+          input.getPipeline(),
+          result.get(MAIN_OUT_TAG),
+          result.get(FAILED_MUTATIONS_TAG),
+          FAILED_MUTATIONS_TAG);
     }
 
-    private PTransform<PCollection<KV<String, byte[]>>, PCollection<KV<String, 
List<byte[]>>>>
-        createDefaultSampler() {
-      return Combine.perKey(
-          ApproximateQuantiles.ApproximateQuantilesCombineFn.create(
-              spec.getNumSamples(),
-              SerializableBytesComparator.INSTANCE,
-              MAX_NUM_KEYS,
-              1. / spec.getNumSamples()));
+    @VisibleForTesting
+    static MutationGroup decode(byte[] bytes) {
+      ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
+      try {
+        return CODER.decode(bis);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    @VisibleForTesting
+    static byte[] encode(MutationGroup g) {
+      ByteArrayOutputStream bos = new ByteArrayOutputStream();
+      try {
+        CODER.encode(g, bos);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      return bos.toByteArray();
     }
   }
 
   private static class ToMutationGroupFn extends DoFn<Mutation, MutationGroup> 
{
-
     @ProcessElement
-    public void processElement(ProcessContext c) throws Exception {
+    public void processElement(ProcessContext c) {
       Mutation value = c.element();
       c.output(MutationGroup.create(value));
     }
   }
 
-  /** Serializes mutations to ((table name, serialized key), serialized value) 
tuple. */
-  private static class SerializeMutationsFn extends DoFn<MutationGroup, 
SerializedMutation> {
+  /**
+   * Gathers a set of mutations together, gets the keys, encodes them to 
byte[], sorts them and then
+   * outputs the encoded sorted list.
+   *
+   * <p>Testing notes: With very small amounts of data, each mutation group is 
in a separate bundle,
+   * and as batching and sorting is over the bundle, this effectively means 
that no batching will
+   * occur, Therefore this DoFn has to be tested in isolation.
+   */
+  @VisibleForTesting
+  static class GatherBundleAndSortFn extends DoFn<MutationGroup, 
Iterable<KV<byte[], byte[]>>> {
+    private final long maxBatchSizeBytes;
+    private final long maxNumMutations;
 
-    final PCollectionView<SpannerSchema> schemaView;
+    // total size of the current batch.
+    private long batchSizeBytes;
+    // total number of mutated cells including indices.
+    private long batchCells;
 
-    private SerializeMutationsFn(PCollectionView<SpannerSchema> schemaView) {
+    private final PCollectionView<SpannerSchema> schemaView;
+
+    private transient BufferedExternalSorter sorter = null;
+
+    GatherBundleAndSortFn(
+        long maxBatchSizeBytes,
+        long maxNumMutations,
+        long groupingFactor,
+        PCollectionView<SpannerSchema> schemaView) {
+      this.maxBatchSizeBytes = maxBatchSizeBytes * groupingFactor;
+      this.maxNumMutations = maxNumMutations * groupingFactor;
       this.schemaView = schemaView;
     }
 
-    @ProcessElement
-    public void processElement(ProcessContext c) {
-      MutationGroup g = c.element();
-      Mutation m = g.primary();
-      SpannerSchema schema = c.sideInput(schemaView);
-      String table = m.getTable();
-      MutationGroupEncoder mutationGroupEncoder = new 
MutationGroupEncoder(schema);
-
-      byte[] key;
-      if (m.getOperation() != Mutation.Op.DELETE) {
-        key = mutationGroupEncoder.encodeKey(m);
-      } else if (isPointDelete(m)) {
-        Key next = m.getKeySet().getKeys().iterator().next();
-        key = mutationGroupEncoder.encodeKey(m.getTable(), next);
+    @StartBundle
+    public void startBundle() throws Exception {
+      if (sorter == null) {
+        initSorter();
       } else {
-        // The key is left empty for non-point deletes, since there is no 
general way to batch them.
-        key = new byte[] {};
+        throw new IllegalStateException("Sorter should be null here");
       }
-      byte[] value = mutationGroupEncoder.encode(g);
-      c.output(SerializedMutation.create(table, key, value));
     }
-  }
-
-  private static class ExtractKeys extends DoFn<SerializedMutation, KV<String, 
byte[]>> {
 
-    @ProcessElement
-    public void processElement(ProcessContext c) {
-      SerializedMutation m = c.element();
-      c.output(KV.of(m.getTableName().toLowerCase(), m.getEncodedKey()));
+    private void initSorter() {
+      sorter =
+          BufferedExternalSorter.create(
+              
BufferedExternalSorter.options().withMemoryMB(2000).withTempLocation("/tmp"));
 
 Review comment:
   Removed BufferedExternalSorter

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 150410)
    Time Spent: 2h 40m  (was: 2.5h)

> Update SpannerIO to support unbounded writes
> --------------------------------------------
>
>                 Key: BEAM-5445
>                 URL: https://issues.apache.org/jira/browse/BEAM-5445
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-gcp
>            Reporter: Chamikara Jayalath
>            Priority: Major
>          Time Spent: 2h 40m
>  Remaining Estimate: 0h
>
> Currently, due to a known issue, streaming pipelines that use SpannerIO.Write 
> do not actually write to Spanner.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to