[ 
https://issues.apache.org/jira/browse/BEAM-5445?focusedWorklogId=150412&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-150412
 ]

ASF GitHub Bot logged work on BEAM-5445:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 02/Oct/18 15:15
            Start Date: 02/Oct/18 15:15
    Worklog Time Spent: 10m 
      Work Description: nielm commented on a change in pull request #6478: 
[BEAM-5445] [BEAM-4796] SpannerIO: Only batch on the current bundle. Adds 
streaming support
URL: https://github.com/apache/beam/pull/6478#discussion_r221993914
 
 

 ##########
 File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
 ##########
 @@ -1074,59 +1061,61 @@ public void setup() {
       batch = ImmutableList.builder();
       batchSizeBytes = 0;
       batchCells = 0;
-      spannerAccessor = spannerConfig.connectToSpanner();
-    }
-
-    @Teardown
-    public void teardown() {
-      spannerAccessor.close();
     }
 
     @ProcessElement
     public void processElement(ProcessContext c) throws Exception {
       SpannerSchema spannerSchema = c.sideInput(schemaView);
-      MutationGroupEncoder mutationGroupEncoder = new 
MutationGroupEncoder(spannerSchema);
 
-      KV<String, Iterable<SerializedMutation>> element = c.element();
-      for (SerializedMutation kv : element.getValue()) {
-        byte[] value = kv.getMutationGroupBytes();
-        MutationGroup mg = mutationGroupEncoder.decode(value);
+      // Iterate through list, outputting whenever a batch is complete.
+      for (KV<byte[], byte[]> kv : c.element()) {
+        MutationGroup mg = decode(kv.getValue());
+
         long groupSize = MutationSizeEstimator.sizeOf(mg);
         long groupCells = MutationCellCounter.countOf(spannerSchema, mg);
-        if (batchCells + groupCells > maxNumMutations
-            || batchSizeBytes + groupSize > maxBatchSizeBytes) {
-          ImmutableList<MutationGroup> mutations = batch.build();
-          c.output(mutations);
-          batch = ImmutableList.builder();
-          batchSizeBytes = 0;
-          batchCells = 0;
+        if (((batchCells + groupCells) > maxNumMutations)
+            || ((batchSizeBytes + groupSize) > maxBatchSizeBytes)) {
+          outputBatch(c);
 
 Review comment:
   Beam docs make no comment about whether DoFns should be thread-safe or not, 
but adding synchronized blocks does not harm.
   GatherBundleAndSortFn has been updated to be thread-safe.
   
   BatchFn has been re-written to use local variables only so is now 
thread-safe.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 150412)
    Time Spent: 2h 50m  (was: 2h 40m)

> Update SpannerIO to support unbounded writes
> --------------------------------------------
>
>                 Key: BEAM-5445
>                 URL: https://issues.apache.org/jira/browse/BEAM-5445
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-gcp
>            Reporter: Chamikara Jayalath
>            Priority: Major
>          Time Spent: 2h 50m
>  Remaining Estimate: 0h
>
> Currently, due to a known issue, streaming pipelines that use SpannerIO.Write 
> do not actually write to Spanner.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to