[ 
https://issues.apache.org/jira/browse/BEAM-3516?focusedWorklogId=99277&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-99277
 ]

ASF GitHub Bot logged work on BEAM-3516:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 07/May/18 23:39
            Start Date: 07/May/18 23:39
    Worklog Time Spent: 10m 
      Work Description: jkff commented on a change in pull request #5297: 
[BEAM-3516] Spanner BatchFn does not respect mutation limits
URL: https://github.com/apache/beam/pull/5297#discussion_r186581470
 
 

 ##########
 File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerSchema.java
 ##########
 @@ -19,68 +19,105 @@
 
 import com.google.auto.value.AutoValue;
 import com.google.cloud.spanner.Type;
-import com.google.common.base.Objects;
-import com.google.common.collect.ArrayListMultimap;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableListMultimap;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableTable;
+import com.google.common.collect.Maps;
 import java.io.Serializable;
-import java.util.ArrayList;
 import java.util.List;
 
 /**
  * Encapsulates Cloud Spanner Schema.
  */
-class SpannerSchema implements Serializable {
-  private final List<String> tables;
-  private final ArrayListMultimap<String, Column> columns;
-  private final ArrayListMultimap<String, KeyPart> keyParts;
+@AutoValue
+abstract class SpannerSchema implements Serializable {
+  abstract ImmutableList<String> tables();
+  abstract ImmutableListMultimap<String, Column> columns();
+  abstract ImmutableListMultimap<String, KeyPart> keyParts();
+  abstract ImmutableTable<String, String, Long> cellsMutatedPerColumn();
+  abstract ImmutableMap<String, Long> cellsMutatedPerRow();
 
   public static Builder builder() {
-    return new Builder();
+    return new AutoValue_SpannerSchema.Builder();
   }
 
   /**
    * Builder for {@link SpannerSchema}.
    */
-  static class Builder {
-    private final ArrayListMultimap<String, Column> columns = 
ArrayListMultimap.create();
-    private final ArrayListMultimap<String, KeyPart> keyParts = 
ArrayListMultimap.create();
-
+  @AutoValue.Builder
+  abstract static class Builder {
+    abstract ImmutableList.Builder<String> tablesBuilder();
+    abstract ImmutableListMultimap.Builder<String, Column> columnsBuilder();
+    abstract ImmutableListMultimap.Builder<String, KeyPart> keyPartsBuilder();
+    abstract ImmutableTable.Builder<String, String, Long> 
cellsMutatedPerColumnBuilder();
+    abstract ImmutableMap.Builder<String, Long> cellsMutatedPerRowBuilder();
+
+    abstract ImmutableListMultimap<String, Column> columns();
+    abstract ImmutableTable<String, String, Long> cellsMutatedPerColumn();
+
+    @VisibleForTesting
     public Builder addColumn(String table, String name, String type) {
-      addColumn(table, Column.create(name.toLowerCase(), type));
-      return this;
+      return addColumn(table, name, type, 1L);
     }
 
-    private Builder addColumn(String table, Column column) {
-      columns.put(table.toLowerCase(), column);
+    public Builder addColumn(String table, String name, String type, long 
cellsMutated) {
+      String tableLower = table.toLowerCase();
+      String nameLower = name.toLowerCase();
+
+      columnsBuilder().put(tableLower, Column.create(nameLower, type));
+      cellsMutatedPerColumnBuilder().put(tableLower, nameLower, cellsMutated);
       return this;
     }
 
     public Builder addKeyPart(String table, String column, boolean desc) {
-      keyParts.put(table.toLowerCase(), KeyPart.create(column.toLowerCase(), 
desc));
+      keyPartsBuilder().put(table.toLowerCase(), 
KeyPart.create(column.toLowerCase(), desc));
       return this;
     }
 
-    public SpannerSchema build() {
-      return new SpannerSchema(columns, keyParts);
-    }
-  }
+    abstract SpannerSchema autoBuild();
 
 Review comment:
   Is this one of the supported builder function names for AutoValue? I thought 
it's supposed to be called build().

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 99277)

> SpannerWriteGroupFn does not respect mutation limits
> ----------------------------------------------------
>
>                 Key: BEAM-3516
>                 URL: https://issues.apache.org/jira/browse/BEAM-3516
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-dataflow
>    Affects Versions: 2.2.0
>            Reporter: Ryan Gordon
>            Assignee: Chamikara Jayalath
>            Priority: Major
>          Time Spent: 3h 10m
>  Remaining Estimate: 0h
>
> When using SpannerIO.write(), if it happens to be a large batch or a table 
> with indexes its very possible it can hit the Spanner Mutations Limitation 
> and fail with the following error:
> {quote}Jan 02, 2018 2:42:59 PM 
> org.apache.beam.runners.dataflow.util.MonitoringUtil$LoggingHandler process
> SEVERE: 2018-01-02T22:42:57.873Z: (3e7c871d215e890b): 
> com.google.cloud.spanner.SpannerException: INVALID_ARGUMENT: 
> io.grpc.StatusRuntimeException: INVALID_ARGUMENT: The transaction contains 
> too many mutations. Insert and update operations count with the multiplicity 
> of the number of columns they affect. For example, inserting values into one 
> key column and four non-key columns count as five mutations total for the 
> insert. Delete and delete range operations count as one mutation regardless 
> of the number of columns affected. The total mutation count includes any 
> changes to indexes that the transaction generates. Please reduce the number 
> of writes, or use fewer indexes. (Maximum number: 20000)
> links {
>  description: "Cloud Spanner limits documentation."
>  url: "https://cloud.google.com/spanner/docs/limits";
> }
> at 
> com.google.cloud.spanner.SpannerExceptionFactory.newSpannerExceptionPreformatted(SpannerExceptionFactory.java:119)
>  at 
> com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException(SpannerExceptionFactory.java:43)
>  at 
> com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException(SpannerExceptionFactory.java:80)
>  at 
> com.google.cloud.spanner.spi.v1.GrpcSpannerRpc.get(GrpcSpannerRpc.java:404)
>  at 
> com.google.cloud.spanner.spi.v1.GrpcSpannerRpc.commit(GrpcSpannerRpc.java:376)
>  at 
> com.google.cloud.spanner.SpannerImpl$SessionImpl$2.call(SpannerImpl.java:729)
>  at 
> com.google.cloud.spanner.SpannerImpl$SessionImpl$2.call(SpannerImpl.java:726)
>  at com.google.cloud.spanner.SpannerImpl.runWithRetries(SpannerImpl.java:200)
>  at 
> com.google.cloud.spanner.SpannerImpl$SessionImpl.writeAtLeastOnce(SpannerImpl.java:725)
>  at 
> com.google.cloud.spanner.SessionPool$PooledSession.writeAtLeastOnce(SessionPool.java:248)
>  at 
> com.google.cloud.spanner.DatabaseClientImpl.writeAtLeastOnce(DatabaseClientImpl.java:37)
>  at 
> org.apache.beam.sdk.io.gcp.spanner.SpannerWriteGroupFn.flushBatch(SpannerWriteGroupFn.java:108)
>  at 
> org.apache.beam.sdk.io.gcp.spanner.SpannerWriteGroupFn.processElement(SpannerWriteGroupFn.java:79)
> {quote}
>  
> As a workaround we can override the "withBatchSizeBytes" to something much 
> smaller:
> {quote}mutations.apply("Write", SpannerIO
>    .write()
>    // Artificially reduce the max batch size b/c the batcher currently doesn't
>    // take into account the 20000 mutation multiplicity limit
>    .withBatchSizeBytes(1024) // 1KB
>    .withProjectId("#PROJECTID#")
>    .withInstanceId("#INSTANCE#")
>    .withDatabaseId("#DATABASE#")
>  );
> {quote}
> While this is not as efficient, it at least allows it to work consistently



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to