[
https://issues.apache.org/jira/browse/BEAM-3516?focusedWorklogId=99701&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-99701
]
ASF GitHub Bot logged work on BEAM-3516:
----------------------------------------
Author: ASF GitHub Bot
Created on: 08/May/18 20:20
Start Date: 08/May/18 20:20
Worklog Time Spent: 10m
Work Description: jkff commented on a change in pull request #5297:
[BEAM-3516] Spanner BatchFn does not respect mutation limits
URL: https://github.com/apache/beam/pull/5297#discussion_r186854696
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationCellCounter.java
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner;
+
+import com.google.cloud.spanner.KeySet;
+import com.google.cloud.spanner.Mutation;
+import com.google.cloud.spanner.Mutation.Op;
+import com.google.common.collect.Iterables;
+
+final class MutationCellCounter {
+ // Prevent construction.
+ private MutationCellCounter() {
+ }
+
+ /**
+ * Count the number of cells modified by {@link MutationGroup}.
+ */
+ public static long countOf(SpannerSchema spannerSchema, MutationGroup
mutationGroup) {
+ long mutatedCells = 0L;
+ for (Mutation mutation : mutationGroup) {
+ if (mutation.getOperation() != Op.DELETE) {
+ // sum the cells of the columns included in the mutation
+ for (String column : mutation.getColumns()) {
+ mutatedCells +=
spannerSchema.getCellsMutatedPerColumn(mutation.getTable(), column);
+ }
+ } else {
+ // deletes are a little less obvious...
+ // for single key deletes simply sum up all the columns in the schema
+ // range deletes are broken up into batches already and can be ignored
Review comment:
Can we verify that the group doesn't include range deletes, and throw an
exception if it does?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 99701)
Time Spent: 3h 50m (was: 3h 40m)
> SpannerWriteGroupFn does not respect mutation limits
> ----------------------------------------------------
>
> Key: BEAM-3516
> URL: https://issues.apache.org/jira/browse/BEAM-3516
> Project: Beam
> Issue Type: Bug
> Components: runner-dataflow
> Affects Versions: 2.2.0
> Reporter: Ryan Gordon
> Assignee: Mairbek Khadikov
> Priority: Major
> Time Spent: 3h 50m
> Remaining Estimate: 0h
>
> When using SpannerIO.write(), if it happens to be a large batch or a table
> with indexes its very possible it can hit the Spanner Mutations Limitation
> and fail with the following error:
> {quote}Jan 02, 2018 2:42:59 PM
> org.apache.beam.runners.dataflow.util.MonitoringUtil$LoggingHandler process
> SEVERE: 2018-01-02T22:42:57.873Z: (3e7c871d215e890b):
> com.google.cloud.spanner.SpannerException: INVALID_ARGUMENT:
> io.grpc.StatusRuntimeException: INVALID_ARGUMENT: The transaction contains
> too many mutations. Insert and update operations count with the multiplicity
> of the number of columns they affect. For example, inserting values into one
> key column and four non-key columns count as five mutations total for the
> insert. Delete and delete range operations count as one mutation regardless
> of the number of columns affected. The total mutation count includes any
> changes to indexes that the transaction generates. Please reduce the number
> of writes, or use fewer indexes. (Maximum number: 20000)
> links {
> description: "Cloud Spanner limits documentation."
> url: "https://cloud.google.com/spanner/docs/limits"
> }
> at
> com.google.cloud.spanner.SpannerExceptionFactory.newSpannerExceptionPreformatted(SpannerExceptionFactory.java:119)
> at
> com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException(SpannerExceptionFactory.java:43)
> at
> com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException(SpannerExceptionFactory.java:80)
> at
> com.google.cloud.spanner.spi.v1.GrpcSpannerRpc.get(GrpcSpannerRpc.java:404)
> at
> com.google.cloud.spanner.spi.v1.GrpcSpannerRpc.commit(GrpcSpannerRpc.java:376)
> at
> com.google.cloud.spanner.SpannerImpl$SessionImpl$2.call(SpannerImpl.java:729)
> at
> com.google.cloud.spanner.SpannerImpl$SessionImpl$2.call(SpannerImpl.java:726)
> at com.google.cloud.spanner.SpannerImpl.runWithRetries(SpannerImpl.java:200)
> at
> com.google.cloud.spanner.SpannerImpl$SessionImpl.writeAtLeastOnce(SpannerImpl.java:725)
> at
> com.google.cloud.spanner.SessionPool$PooledSession.writeAtLeastOnce(SessionPool.java:248)
> at
> com.google.cloud.spanner.DatabaseClientImpl.writeAtLeastOnce(DatabaseClientImpl.java:37)
> at
> org.apache.beam.sdk.io.gcp.spanner.SpannerWriteGroupFn.flushBatch(SpannerWriteGroupFn.java:108)
> at
> org.apache.beam.sdk.io.gcp.spanner.SpannerWriteGroupFn.processElement(SpannerWriteGroupFn.java:79)
> {quote}
>
> As a workaround we can override the "withBatchSizeBytes" to something much
> smaller:
> {quote}mutations.apply("Write", SpannerIO
> .write()
> // Artificially reduce the max batch size b/c the batcher currently doesn't
> // take into account the 20000 mutation multiplicity limit
> .withBatchSizeBytes(1024) // 1KB
> .withProjectId("#PROJECTID#")
> .withInstanceId("#INSTANCE#")
> .withDatabaseId("#DATABASE#")
> );
> {quote}
> While this is not as efficient, it at least allows it to work consistently
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)