[ 
https://issues.apache.org/jira/browse/BEAM-3516?focusedWorklogId=101142&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-101142
 ]

ASF GitHub Bot logged work on BEAM-3516:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 11/May/18 14:40
            Start Date: 11/May/18 14:40
    Worklog Time Spent: 10m 
      Work Description: NathanHowell closed pull request #4860: [BEAM-3516] 
Spanner BatchFn does not respect mutation limits
URL: https://github.com/apache/beam/pull/4860
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationCellEstimator.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationCellEstimator.java
new file mode 100644
index 00000000000..2b2e209b3e1
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationCellEstimator.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner;
+
+import com.google.cloud.spanner.KeySet;
+import com.google.cloud.spanner.Mutation;
+import com.google.cloud.spanner.Mutation.Op;
+import com.google.common.collect.Iterables;
+
+final class MutationCellEstimator {
+  // Prevent construction.
+  private MutationCellEstimator() {
+  }
+
+  /**
+   * Estimate the number of cells modified in a {@link MutationGroup}.
+   */
+  public static long countOf(SpannerSchema spannerSchema, MutationGroup 
mutationGroup) {
+    long mutatedCells = 0L;
+    for (Mutation mutation : mutationGroup) {
+      if (mutation.getOperation() != Op.DELETE) {
+        // sum the cells of the columns included in the mutation
+        for (String column : mutation.getColumns()) {
+          mutatedCells += 
spannerSchema.getCellsMutatedPerColumn(mutation.getTable(), column);
+        }
+      } else {
+        // deletes are a little less obvious...
+        // for single key deletes simply sum up all the columns in the schema
+        // range deletes are broken up into batches already and can be ignored
+        final KeySet keySet = mutation.getKeySet();
+
+        final long rows = Iterables.size(keySet.getKeys());
+        if (rows > 0) {
+          mutatedCells += rows * 
spannerSchema.getCellsMutatedPerRow(mutation.getTable());
+        }
+      }
+    }
+
+    return mutatedCells;
+  }
+}
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/ReadSpannerSchema.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/ReadSpannerSchema.java
index 1c16fa80e2c..64727cb50e7 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/ReadSpannerSchema.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/ReadSpannerSchema.java
@@ -59,8 +59,9 @@ public void processElement(ProcessContext c) throws Exception 
{
         String tableName = resultSet.getString(0);
         String columnName = resultSet.getString(1);
         String type = resultSet.getString(2);
+        long cellsMutated = resultSet.getLong(3);
 
-        builder.addColumn(tableName, columnName, type);
+        builder.addColumn(tableName, columnName, type, cellsMutated);
       }
 
       resultSet = readPrimaryKeyInfo(tx);
@@ -76,11 +77,28 @@ public void processElement(ProcessContext c) throws 
Exception {
   }
 
   private ResultSet readTableInfo(ReadOnlyTransaction tx) {
-    return tx.executeQuery(Statement.of(
-        "SELECT c.table_name, c.column_name, c.spanner_type"
-            + " FROM information_schema.columns as c"
-            + " WHERE c.table_catalog = '' AND c.table_schema = ''"
-            + " ORDER BY c.table_name, c.ordinal_position"));
+    // retrieve schema information for all tables, as well as aggregating the
+    // number of indexes that cover each column. this will be used to estimate
+    // the number of cells (table column plus indexes) mutated in an upsert 
operation
+    // in order to stay below the 20k threshold
+    return tx.executeQuery(Statement
+        .of("SELECT"
+            + "    c.table_name"
+            + "  , c.column_name"
+            + "  , c.spanner_type"
+            + "  , (1 + COALESCE(t.indices, 0)) AS cells_mutated"
+            + "  FROM ("
+            + "    SELECT c.table_name, c.column_name, c.spanner_type, 
c.ordinal_position"
+            + "     FROM information_schema.columns as c"
+            + "     WHERE c.table_catalog = '' AND c.table_schema = '') AS c"
+            + "  LEFT OUTER JOIN ("
+            + "    SELECT t.table_name, t.column_name, COUNT(*) AS indices"
+            + "      FROM information_schema.index_columns AS t "
+            + "      WHERE t.index_name != 'PRIMARY_KEY' AND t.table_catalog = 
''"
+            + "      AND t.table_schema = ''"
+            + "      GROUP BY t.table_name, t.column_name) AS t"
+            + "  USING (table_name, column_name)"
+            + "  ORDER BY c.table_name, c.ordinal_position"));
   }
 
   private ResultSet readPrimaryKeyInfo(ReadOnlyTransaction tx) {
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
index ce1e51d2e25..0750d527948 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
@@ -34,10 +34,10 @@
 import com.google.cloud.spanner.Struct;
 import com.google.cloud.spanner.TimestampBound;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.primitives.UnsignedBytes;
 import java.io.Serializable;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
@@ -181,7 +181,7 @@
 
   private static final long DEFAULT_BATCH_SIZE_BYTES = 1024 * 1024; // 1 MB
   // Max number of mutations to batch together.
-  private static final int MAX_NUM_MUTATIONS = 10000;
+  private static final int DEFAULT_MAX_NUM_MUTATIONS = 10000;
   // The maximum number of keys to fit in memory when computing approximate 
quantiles.
   private static final long MAX_NUM_KEYS = (long) 1e6;
   // TODO calculate number of samples based on the size of the input.
@@ -236,6 +236,7 @@ public static Write write() {
     return new AutoValue_SpannerIO_Write.Builder()
         .setSpannerConfig(SpannerConfig.create())
         .setBatchSizeBytes(DEFAULT_BATCH_SIZE_BYTES)
+        .setMaxNumMutations(DEFAULT_MAX_NUM_MUTATIONS)
         .setNumSamples(DEFAULT_NUM_SAMPLES)
         .build();
   }
@@ -609,10 +610,14 @@ public CreateTransaction 
withTimestampBound(TimestampBound timestampBound) {
   @AutoValue
   public abstract static class Write extends PTransform<PCollection<Mutation>, 
PDone> {
 
+    private long maxNumMutations;
+
     abstract SpannerConfig getSpannerConfig();
 
     abstract long getBatchSizeBytes();
 
+    abstract long getMaxNumMutations();
+
     abstract int getNumSamples();
 
     @Nullable
@@ -628,6 +633,8 @@ public CreateTransaction withTimestampBound(TimestampBound 
timestampBound) {
 
       abstract Builder setBatchSizeBytes(long batchSizeBytes);
 
+      abstract Builder setMaxNumMutations(long maxNumMutations);
+
       abstract Builder setNumSamples(int numSamples);
 
       abstract Builder setSampler(
@@ -706,6 +713,11 @@ public Write withBatchSizeBytes(long batchSizeBytes) {
       return toBuilder().setBatchSizeBytes(batchSizeBytes).build();
     }
 
+    /** Specifies the cell mutation limit. */
+    public Write withMaxNumMutations(long maxNumMutations) {
+      return toBuilder().setMaxNumMutations(maxNumMutations).build();
+    }
+
     @Override
     public PDone expand(PCollection<Mutation> input) {
       getSpannerConfig().validate();
@@ -785,7 +797,8 @@ public PDone expand(PCollection<MutationGroup> input) {
           .apply("Group by partition", GroupByKey.create())
           .apply(
               "Batch mutations together",
-              ParDo.of(new BatchFn(spec.getBatchSizeBytes(), 
spec.getSpannerConfig(), schemaView))
+              ParDo.of(new BatchFn(spec.getBatchSizeBytes(), 
spec.getMaxNumMutations(),
+                  spec.getSpannerConfig(), schemaView))
                   .withSideInputs(schemaView))
           .apply(
               "Write mutations to Spanner",
@@ -902,31 +915,35 @@ public AssignPartitionFn(PCollectionView<Map<String, 
List<byte[]>>> sampleView)
   private static class BatchFn
       extends DoFn<KV<String, Iterable<SerializedMutation>>, 
Iterable<Mutation>> {
 
-    private static final int MAX_RETRIES = 5;
-    private static final FluentBackoff BUNDLE_WRITE_BACKOFF = 
FluentBackoff.DEFAULT
-        
.withMaxRetries(MAX_RETRIES).withInitialBackoff(Duration.standardSeconds(5));
-
     private final long maxBatchSizeBytes;
+    private final long maxNumMutations;
     private final SpannerConfig spannerConfig;
     private final PCollectionView<SpannerSchema> schemaView;
 
     private transient SpannerAccessor spannerAccessor;
     // Current batch of mutations to be written.
-    private List<Mutation> mutations;
+    private transient ImmutableList.Builder<Mutation> batch;
     // total size of the current batch.
     private long batchSizeBytes;
+    // total number of mutated cells including indices.
+    private long batchCells;
 
-    private BatchFn(long maxBatchSizeBytes, SpannerConfig spannerConfig,
+    private BatchFn(
+        long maxBatchSizeBytes,
+        long maxNumMutations,
+        SpannerConfig spannerConfig,
         PCollectionView<SpannerSchema> schemaView) {
       this.maxBatchSizeBytes = maxBatchSizeBytes;
+      this.maxNumMutations = maxNumMutations;
       this.spannerConfig = spannerConfig;
       this.schemaView = schemaView;
     }
 
     @Setup
     public void setup() throws Exception {
-      mutations = new ArrayList<>();
+      batch = ImmutableList.builder();
       batchSizeBytes = 0;
+      batchCells = 0;
       spannerAccessor = spannerConfig.connectToSpanner();
     }
 
@@ -937,23 +954,37 @@ public void teardown() throws Exception {
 
     @ProcessElement
     public void processElement(ProcessContext c) throws Exception {
-      MutationGroupEncoder mutationGroupEncoder = new 
MutationGroupEncoder(c.sideInput(schemaView));
+      SpannerSchema spannerSchema = c.sideInput(schemaView);
+      MutationGroupEncoder mutationGroupEncoder = new 
MutationGroupEncoder(spannerSchema);
+
       KV<String, Iterable<SerializedMutation>> element = c.element();
       for (SerializedMutation kv : element.getValue()) {
         byte[] value = kv.getMutationGroupBytes();
         MutationGroup mg = mutationGroupEncoder.decode(value);
-        Iterables.addAll(mutations, mg);
-        batchSizeBytes += MutationSizeEstimator.sizeOf(mg);
-        if (batchSizeBytes >= maxBatchSizeBytes || mutations.size() > 
MAX_NUM_MUTATIONS) {
-          c.output(mutations);
-          mutations = new ArrayList<>();
-          batchSizeBytes = 0;
+        long groupSize = MutationSizeEstimator.sizeOf(mg);
+        long groupCells = MutationCellEstimator.countOf(spannerSchema, mg);
+
+        if (batchCells + groupCells > maxNumMutations
+            || batchSizeBytes + groupSize > maxBatchSizeBytes) {
+          flush(c);
         }
+
+        batch.addAll(mg);
+        batchSizeBytes += groupSize;
+        batchCells += groupCells;
       }
+
+      flush(c);
+    }
+
+    private void flush(ProcessContext c) {
+      ImmutableList<Mutation> mutations = batch.build();
+
       if (!mutations.isEmpty()) {
         c.output(mutations);
-        mutations = new ArrayList<>();
+        batch = ImmutableList.builder();
         batchSizeBytes = 0;
+        batchCells = 0;
       }
     }
   }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerSchema.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerSchema.java
index f76c7376404..374c8389f68 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerSchema.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerSchema.java
@@ -19,68 +19,105 @@
 
 import com.google.auto.value.AutoValue;
 import com.google.cloud.spanner.Type;
-import com.google.common.base.Objects;
-import com.google.common.collect.ArrayListMultimap;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableListMultimap;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableTable;
+import com.google.common.collect.Maps;
 import java.io.Serializable;
-import java.util.ArrayList;
 import java.util.List;
 
 /**
  * Encapsulates Cloud Spanner Schema.
  */
-class SpannerSchema implements Serializable {
-  private final List<String> tables;
-  private final ArrayListMultimap<String, Column> columns;
-  private final ArrayListMultimap<String, KeyPart> keyParts;
+@AutoValue
+abstract class SpannerSchema implements Serializable {
+  abstract ImmutableList<String> tables();
+  abstract ImmutableListMultimap<String, Column> columns();
+  abstract ImmutableListMultimap<String, KeyPart> keyParts();
+  abstract ImmutableTable<String, String, Long> cellsMutatedPerColumn();
+  abstract ImmutableMap<String, Long> cellsMutatedPerRow();
 
   public static Builder builder() {
-    return new Builder();
+    return new AutoValue_SpannerSchema.Builder();
   }
 
   /**
    * Builder for {@link SpannerSchema}.
    */
-  static class Builder {
-    private final ArrayListMultimap<String, Column> columns = 
ArrayListMultimap.create();
-    private final ArrayListMultimap<String, KeyPart> keyParts = 
ArrayListMultimap.create();
-
+  @AutoValue.Builder
+  abstract static class Builder {
+    abstract ImmutableList.Builder<String> tablesBuilder();
+    abstract ImmutableListMultimap.Builder<String, Column> columnsBuilder();
+    abstract ImmutableListMultimap.Builder<String, KeyPart> keyPartsBuilder();
+    abstract ImmutableTable.Builder<String, String, Long> 
cellsMutatedPerColumnBuilder();
+    abstract ImmutableMap.Builder<String, Long> cellsMutatedPerRowBuilder();
+
+    abstract ImmutableListMultimap<String, Column> columns();
+    abstract ImmutableTable<String, String, Long> cellsMutatedPerColumn();
+
+    @VisibleForTesting
     public Builder addColumn(String table, String name, String type) {
-      addColumn(table, Column.create(name.toLowerCase(), type));
-      return this;
+      return addColumn(table, name, type, 1L);
     }
 
-    private Builder addColumn(String table, Column column) {
-      columns.put(table.toLowerCase(), column);
+    public Builder addColumn(String table, String name, String type, long 
cellsMutated) {
+      String tableLower = table.toLowerCase();
+      String nameLower = name.toLowerCase();
+
+      columnsBuilder().put(tableLower, Column.create(nameLower, type));
+      cellsMutatedPerColumnBuilder().put(tableLower, nameLower, cellsMutated);
       return this;
     }
 
     public Builder addKeyPart(String table, String column, boolean desc) {
-      keyParts.put(table, KeyPart.create(column.toLowerCase(), desc));
+      keyPartsBuilder().put(table.toLowerCase(), 
KeyPart.create(column.toLowerCase(), desc));
       return this;
     }
 
-    public SpannerSchema build() {
-      return new SpannerSchema(columns, keyParts);
-    }
-  }
+    abstract SpannerSchema autoBuild();
+
+    public final SpannerSchema build() {
+      // precompute the number of cells that are mutated for operations 
affecting
+      // an entire row such as a single key delete.
+      cellsMutatedPerRowBuilder().putAll(Maps.transformValues(
+          cellsMutatedPerColumn().rowMap(),
+          entry -> entry.values().stream().mapToLong(Long::longValue).sum()));
 
-  private SpannerSchema(ArrayListMultimap<String, Column> columns,
-      ArrayListMultimap<String, KeyPart> keyParts) {
-    this.columns = columns;
-    this.keyParts = keyParts;
-    tables = new ArrayList<>(columns.keySet());
+      tablesBuilder().addAll(columns().keySet());
+
+      return autoBuild();
+    }
   }
 
   public List<String> getTables() {
-    return tables;
+    return tables();
   }
 
   public List<Column> getColumns(String table) {
-    return columns.get(table);
+    return columns().get(table.toLowerCase());
   }
 
   public List<KeyPart> getKeyParts(String table) {
-    return keyParts.get(table);
+    return keyParts().get(table.toLowerCase());
+  }
+
+  /**
+   * Return the total number of cells affected when the specified column is 
mutated.
+   */
+  public long getCellsMutatedPerColumn(String table, String column) {
+    return cellsMutatedPerColumn()
+        .row(table.toLowerCase())
+        .getOrDefault(column.toLowerCase(), 1L);
+  }
+
+  /**
+   * Return the total number of cells affected with the given row is deleted.
+   */
+  public long getCellsMutatedPerRow(String table) {
+    return cellsMutatedPerRow()
+        .getOrDefault(table.toLowerCase(), 1L);
   }
 
   @AutoValue
@@ -142,22 +179,4 @@ private static Type parseSpannerType(String spannerType) {
       throw new IllegalArgumentException("Unknown spanner type " + 
spannerType);
     }
   }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-    if (o == null || getClass() != o.getClass()) {
-      return false;
-    }
-    SpannerSchema that = (SpannerSchema) o;
-    return Objects.equal(tables, that.tables) && Objects.equal(columns, 
that.columns) && Objects
-        .equal(keyParts, that.keyParts);
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hashCode(tables, columns, keyParts);
-  }
 }
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/ReadSpannerSchemaTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/ReadSpannerSchemaTest.java
index 25dc6dcf406..1e516842b9b 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/ReadSpannerSchemaTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/ReadSpannerSchemaTest.java
@@ -53,7 +53,9 @@
 
   private static Struct columnMetadata(String tableName, String columnName, 
String type) {
     return Struct.newBuilder().add("table_name", Value.string(tableName))
-        .add("column_name", Value.string(columnName)).add("spanner_type", 
Value.string(type))
+        .add("column_name", Value.string(columnName))
+        .add("spanner_type", Value.string(type))
+        .add("cells_mutated", Value.int64(3L))
         .build();
   }
 
@@ -66,7 +68,8 @@ private static Struct pkMetadata(String tableName, String 
columnName, String ord
   private void prepareColumnMetadata(ReadOnlyTransaction tx, List<Struct> 
rows) {
     Type type = Type.struct(Type.StructField.of("table_name", Type.string()),
         Type.StructField.of("column_name", Type.string()),
-        Type.StructField.of("spanner_type", Type.string()));
+        Type.StructField.of("spanner_type", Type.string()),
+        Type.StructField.of("cells_mutated", Type.int64()));
     when(tx.executeQuery(argThat(new ArgumentMatcher<Statement>() {
 
       @Override public boolean matches(Object argument) {
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java
index 42f7fb32c6c..68737526253 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java
@@ -66,6 +66,7 @@
  */
 @RunWith(JUnit4.class)
 public class SpannerIOWriteTest implements Serializable {
+  private static final long CELLS_PER_KEY = 7;
 
   @Rule public transient TestPipeline pipeline = TestPipeline.create();
   @Rule public transient ExpectedException thrown = ExpectedException.none();
@@ -80,12 +81,15 @@
 
     // Simplest schema: a table with int64 key
     preparePkMetadata(tx, Arrays.asList(pkMetadata("test", "key", "ASC")));
-    prepareColumnMetadata(tx, Arrays.asList(columnMetadata("test", "key", 
"INT64")));
+    prepareColumnMetadata(tx, Arrays.asList(columnMetadata("test", "key", 
"INT64", CELLS_PER_KEY)));
   }
 
-  private static Struct columnMetadata(String tableName, String columnName, 
String type) {
+  private static Struct columnMetadata(
+      String tableName, String columnName, String type, long cellsMutated) {
     return Struct.newBuilder().add("table_name", Value.string(tableName))
-        .add("column_name", Value.string(columnName)).add("spanner_type", 
Value.string(type))
+        .add("column_name", Value.string(columnName))
+        .add("spanner_type", Value.string(type))
+        .add("cells_mutated", Value.int64(cellsMutated))
         .build();
   }
 
@@ -98,7 +102,8 @@ private static Struct pkMetadata(String tableName, String 
columnName, String ord
   private void prepareColumnMetadata(ReadOnlyTransaction tx, List<Struct> 
rows) {
     Type type = Type.struct(Type.StructField.of("table_name", Type.string()),
         Type.StructField.of("column_name", Type.string()),
-        Type.StructField.of("spanner_type", Type.string()));
+        Type.StructField.of("spanner_type", Type.string()),
+        Type.StructField.of("cells_mutated", Type.int64()));
     when(tx.executeQuery(argThat(new ArgumentMatcher<Statement>() {
 
       @Override public boolean matches(Object argument) {
@@ -280,10 +285,9 @@ private void verifyBatches(Iterable<Mutation>... batches) {
 
   @Test
   @Category(NeedsRunner.class)
-  public void batchingGroups() throws Exception {
-
-    // Have a room to accumulate one more item.
-    long batchSize = MutationSizeEstimator.sizeOf(g(m(1L))) + 1;
+  public void sizeBatchingGroups() throws Exception {
+    // Accumulate two items per batch.
+    long batchSize = MutationSizeEstimator.sizeOf(g(m(1L))) * 2;
 
     PCollection<MutationGroup> mutations = pipeline.apply(Create.of(g(m(1L)), 
g(m(2L)), g(m(3L))));
     mutations.apply(SpannerIO.write()
@@ -304,6 +308,32 @@ public void batchingGroups() throws Exception {
         .writeAtLeastOnce(iterableOfSize(1));
   }
 
+  @Test
+  @Category(NeedsRunner.class)
+  public void cellBatchingGroups() throws Exception {
+    // Accumulate two items per batch.
+    long maxNumMutations = CELLS_PER_KEY * 2;
+
+    PCollection<MutationGroup> mutations = pipeline.apply(Create.of(g(m(1L)), 
g(m(2L)), g(m(3L))));
+    mutations.apply(SpannerIO.write()
+        .withProjectId("test-project")
+        .withInstanceId("test-instance")
+        .withDatabaseId("test-database")
+        .withServiceFactory(serviceFactory)
+        .withMaxNumMutations(maxNumMutations)
+        .withBatchSizeBytes(Integer.MAX_VALUE)
+        .withSampler(fakeSampler(m(1000L)))
+        .grouped());
+
+    pipeline.run();
+
+    // The content of batches is not deterministic. Just verify that the size 
is correct.
+    verify(serviceFactory.mockDatabaseClient(), times(1))
+        .writeAtLeastOnce(iterableOfSize(2));
+    verify(serviceFactory.mockDatabaseClient(), times(1))
+        .writeAtLeastOnce(iterableOfSize(1));
+  }
+
   @Test
   @Category(NeedsRunner.class)
   public void noBatching() throws Exception {
@@ -455,7 +485,7 @@ public void describeTo(Description description) {
 
   private static FakeSampler fakeSampler(Mutation... mutations) {
     SpannerSchema.Builder schema = SpannerSchema.builder();
-    schema.addColumn("test", "key", "INT64");
+    schema.addColumn("test", "key", "INT64", CELLS_PER_KEY);
     schema.addKeyPart("test", "key", false);
     return new FakeSampler(schema.build(), Arrays.asList(mutations));
   }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 101142)
    Time Spent: 5h 10m  (was: 5h)

> SpannerWriteGroupFn does not respect mutation limits
> ----------------------------------------------------
>
>                 Key: BEAM-3516
>                 URL: https://issues.apache.org/jira/browse/BEAM-3516
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-dataflow
>    Affects Versions: 2.2.0
>            Reporter: Ryan Gordon
>            Assignee: Mairbek Khadikov
>            Priority: Major
>          Time Spent: 5h 10m
>  Remaining Estimate: 0h
>
> When using SpannerIO.write(), if it happens to be a large batch or a table 
> with indexes its very possible it can hit the Spanner Mutations Limitation 
> and fail with the following error:
> {quote}Jan 02, 2018 2:42:59 PM 
> org.apache.beam.runners.dataflow.util.MonitoringUtil$LoggingHandler process
> SEVERE: 2018-01-02T22:42:57.873Z: (3e7c871d215e890b): 
> com.google.cloud.spanner.SpannerException: INVALID_ARGUMENT: 
> io.grpc.StatusRuntimeException: INVALID_ARGUMENT: The transaction contains 
> too many mutations. Insert and update operations count with the multiplicity 
> of the number of columns they affect. For example, inserting values into one 
> key column and four non-key columns count as five mutations total for the 
> insert. Delete and delete range operations count as one mutation regardless 
> of the number of columns affected. The total mutation count includes any 
> changes to indexes that the transaction generates. Please reduce the number 
> of writes, or use fewer indexes. (Maximum number: 20000)
> links {
>  description: "Cloud Spanner limits documentation."
>  url: "https://cloud.google.com/spanner/docs/limits";
> }
> at 
> com.google.cloud.spanner.SpannerExceptionFactory.newSpannerExceptionPreformatted(SpannerExceptionFactory.java:119)
>  at 
> com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException(SpannerExceptionFactory.java:43)
>  at 
> com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException(SpannerExceptionFactory.java:80)
>  at 
> com.google.cloud.spanner.spi.v1.GrpcSpannerRpc.get(GrpcSpannerRpc.java:404)
>  at 
> com.google.cloud.spanner.spi.v1.GrpcSpannerRpc.commit(GrpcSpannerRpc.java:376)
>  at 
> com.google.cloud.spanner.SpannerImpl$SessionImpl$2.call(SpannerImpl.java:729)
>  at 
> com.google.cloud.spanner.SpannerImpl$SessionImpl$2.call(SpannerImpl.java:726)
>  at com.google.cloud.spanner.SpannerImpl.runWithRetries(SpannerImpl.java:200)
>  at 
> com.google.cloud.spanner.SpannerImpl$SessionImpl.writeAtLeastOnce(SpannerImpl.java:725)
>  at 
> com.google.cloud.spanner.SessionPool$PooledSession.writeAtLeastOnce(SessionPool.java:248)
>  at 
> com.google.cloud.spanner.DatabaseClientImpl.writeAtLeastOnce(DatabaseClientImpl.java:37)
>  at 
> org.apache.beam.sdk.io.gcp.spanner.SpannerWriteGroupFn.flushBatch(SpannerWriteGroupFn.java:108)
>  at 
> org.apache.beam.sdk.io.gcp.spanner.SpannerWriteGroupFn.processElement(SpannerWriteGroupFn.java:79)
> {quote}
>  
> As a workaround we can override the "withBatchSizeBytes" to something much 
> smaller:
> {quote}mutations.apply("Write", SpannerIO
>    .write()
>    // Artificially reduce the max batch size b/c the batcher currently doesn't
>    // take into account the 20000 mutation multiplicity limit
>    .withBatchSizeBytes(1024) // 1KB
>    .withProjectId("#PROJECTID#")
>    .withInstanceId("#INSTANCE#")
>    .withDatabaseId("#DATABASE#")
>  );
> {quote}
> While this is not as efficient, it at least allows it to work consistently



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to