[
https://issues.apache.org/jira/browse/BEAM-3516?focusedWorklogId=100891&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-100891
]
ASF GitHub Bot logged work on BEAM-3516:
----------------------------------------
Author: ASF GitHub Bot
Created on: 10/May/18 23:06
Start Date: 10/May/18 23:06
Worklog Time Spent: 10m
Work Description: jkff closed pull request #5297: [BEAM-3516] Spanner
BatchFn does not respect mutation limits
URL: https://github.com/apache/beam/pull/5297
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationCellCounter.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationCellCounter.java
new file mode 100644
index 00000000000..f0e218cc4dc
--- /dev/null
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationCellCounter.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner;
+
+import static org.apache.beam.sdk.io.gcp.spanner.MutationUtils.isPointDelete;
+
+import com.google.cloud.spanner.KeySet;
+import com.google.cloud.spanner.Mutation;
+import com.google.cloud.spanner.Mutation.Op;
+import com.google.common.collect.Iterables;
+
+final class MutationCellCounter {
+ // Prevent construction.
+ private MutationCellCounter() {
+ }
+
+ /**
+ * Count the number of cells modified by {@link MutationGroup}.
+ */
+ public static long countOf(SpannerSchema spannerSchema, MutationGroup
mutationGroup) {
+ long mutatedCells = 0L;
+ for (Mutation mutation : mutationGroup) {
+ if (mutation.getOperation() == Op.DELETE) {
+ // For single key deletes sum up all the columns in the schema.
+ // There is no clear way to estimate range deletes, so they are
ignored.
+ if (isPointDelete(mutation)) {
+ final KeySet keySet = mutation.getKeySet();
+
+ final long rows = Iterables.size(keySet.getKeys());
+ mutatedCells += rows *
spannerSchema.getCellsMutatedPerRow(mutation.getTable());
+ }
+ } else {
+ // sum the cells of the columns included in the mutation
+ for (String column : mutation.getColumns()) {
+ mutatedCells +=
spannerSchema.getCellsMutatedPerColumn(mutation.getTable(), column);
+ }
+ }
+ }
+
+ return mutatedCells;
+ }
+}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationUtils.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationUtils.java
new file mode 100644
index 00000000000..ae162bdbb75
--- /dev/null
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationUtils.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner;
+
+import com.google.cloud.spanner.Mutation;
+import com.google.common.collect.Iterables;
+
+final class MutationUtils {
+ private MutationUtils() {
+
+ }
+
+ /**
+ * Check if the mutation is a delete by a single primary key operation.
+ * @param m mutation
+ * @return true if mutation is a point delete
+ */
+ public static boolean isPointDelete(Mutation m) {
+ return m.getOperation() == Mutation.Op.DELETE &&
Iterables.isEmpty(m.getKeySet().getRanges())
+ && Iterables.size(m.getKeySet().getKeys()) == 1;
+ }
+
+}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/ReadSpannerSchema.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/ReadSpannerSchema.java
index 250b87b3216..23f33b00f4a 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/ReadSpannerSchema.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/ReadSpannerSchema.java
@@ -59,8 +59,9 @@ public void processElement(ProcessContext c) throws Exception
{
String tableName = resultSet.getString(0);
String columnName = resultSet.getString(1);
String type = resultSet.getString(2);
+ long cellsMutated = resultSet.getLong(3);
- builder.addColumn(tableName, columnName, type);
+ builder.addColumn(tableName, columnName, type, cellsMutated);
}
resultSet = readPrimaryKeyInfo(tx);
@@ -76,11 +77,28 @@ public void processElement(ProcessContext c) throws
Exception {
}
private ResultSet readTableInfo(ReadOnlyTransaction tx) {
- return tx.executeQuery(Statement.of(
- "SELECT c.table_name, c.column_name, c.spanner_type"
- + " FROM information_schema.columns as c"
- + " WHERE c.table_catalog = '' AND c.table_schema = ''"
- + " ORDER BY c.table_name, c.ordinal_position"));
+ // retrieve schema information for all tables, as well as aggregating the
+ // number of indexes that cover each column. this will be used to estimate
+ // the number of cells (table column plus indexes) mutated in an upsert
operation
+ // in order to stay below the 20k threshold
+ return tx.executeQuery(Statement
+ .of("SELECT"
+ + " c.table_name"
+ + " , c.column_name"
+ + " , c.spanner_type"
+ + " , (1 + COALESCE(t.indices, 0)) AS cells_mutated"
+ + " FROM ("
+ + " SELECT c.table_name, c.column_name, c.spanner_type,
c.ordinal_position"
+ + " FROM information_schema.columns as c"
+ + " WHERE c.table_catalog = '' AND c.table_schema = '') AS c"
+ + " LEFT OUTER JOIN ("
+ + " SELECT t.table_name, t.column_name, COUNT(*) AS indices"
+ + " FROM information_schema.index_columns AS t "
+ + " WHERE t.index_name != 'PRIMARY_KEY' AND t.table_catalog =
''"
+ + " AND t.table_schema = ''"
+ + " GROUP BY t.table_name, t.column_name) AS t"
+ + " USING (table_name, column_name)"
+ + " ORDER BY c.table_name, c.ordinal_position"));
}
private ResultSet readPrimaryKeyInfo(ReadOnlyTransaction tx) {
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
index c3924d861f3..13a64c21c0a 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
@@ -19,6 +19,7 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.beam.sdk.io.gcp.spanner.MutationUtils.isPointDelete;
import com.google.auto.value.AutoValue;
import com.google.cloud.ServiceFactory;
@@ -34,10 +35,10 @@
import com.google.cloud.spanner.Struct;
import com.google.cloud.spanner.TimestampBound;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.primitives.UnsignedBytes;
import java.io.Serializable;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
@@ -183,7 +184,7 @@
private static final long DEFAULT_BATCH_SIZE_BYTES = 1024L * 1024L; // 1 MB
// Max number of mutations to batch together.
- private static final int MAX_NUM_MUTATIONS = 10000;
+ private static final int DEFAULT_MAX_NUM_MUTATIONS = 5000;
// The maximum number of keys to fit in memory when computing approximate
quantiles.
private static final long MAX_NUM_KEYS = (long) 1e6;
// TODO calculate number of samples based on the size of the input.
@@ -240,6 +241,7 @@ public static Write write() {
return new AutoValue_SpannerIO_Write.Builder()
.setSpannerConfig(SpannerConfig.create())
.setBatchSizeBytes(DEFAULT_BATCH_SIZE_BYTES)
+ .setMaxNumMutations(DEFAULT_MAX_NUM_MUTATIONS)
.setNumSamples(DEFAULT_NUM_SAMPLES)
.setFailureMode(FailureMode.FAIL_FAST)
.build();
@@ -668,6 +670,8 @@ public CreateTransaction withTimestampBound(TimestampBound
timestampBound) {
abstract long getBatchSizeBytes();
+ abstract long getMaxNumMutations();
+
abstract int getNumSamples();
abstract FailureMode getFailureMode();
@@ -685,6 +689,8 @@ public CreateTransaction withTimestampBound(TimestampBound
timestampBound) {
abstract Builder setBatchSizeBytes(long batchSizeBytes);
+ abstract Builder setMaxNumMutations(long maxNumMutations);
+
abstract Builder setNumSamples(int numSamples);
abstract Builder setFailureMode(FailureMode failureMode);
@@ -775,6 +781,11 @@ public Write withFailureMode(FailureMode failureMode) {
return toBuilder().setFailureMode(failureMode).build();
}
+ /** Specifies the cell mutation limit. */
+ public Write withMaxNumMutations(long maxNumMutations) {
+ return toBuilder().setMaxNumMutations(maxNumMutations).build();
+ }
+
@Override
public SpannerWriteResult expand(PCollection<Mutation> input) {
getSpannerConfig().validate();
@@ -855,9 +866,11 @@ public SpannerWriteResult
expand(PCollection<MutationGroup> input) {
PCollectionTuple result = serialized
.apply("Partition input",
ParDo.of(assignPartitionFn).withSideInputs(keySample))
.setCoder(KvCoder.of(StringUtf8Coder.of(),
SerializedMutationCoder.of()))
- .apply("Group by partition", GroupByKey.create()).apply("Batch
mutations together",
- ParDo.of(new BatchFn(spec.getBatchSizeBytes(),
spec.getSpannerConfig(), schemaView))
- .withSideInputs(schemaView)).apply("Write mutations to
Spanner",
+ .apply("Group by partition", GroupByKey.create())
+ .apply("Batch mutations together", ParDo.of(new
BatchFn(spec.getBatchSizeBytes(),
+ spec.getMaxNumMutations(), spec.getSpannerConfig(), schemaView))
+ .withSideInputs(schemaView))
+ .apply("Write mutations to Spanner",
ParDo.of(new WriteToSpannerFn(spec.getSpannerConfig(),
spec.getFailureMode(),
failedTag))
.withOutputTags(mainTag, TupleTagList.of(failedTag)));
@@ -865,6 +878,7 @@ public SpannerWriteResult expand(PCollection<MutationGroup>
input) {
failedMutations.setCoder(SerializableCoder.of(MutationGroup.class));
return new SpannerWriteResult(input.getPipeline(), result.get(mainTag),
failedMutations,
failedTag);
+
}
private PTransform<PCollection<KV<String, byte[]>>, PCollection<KV<String,
List<byte[]>>>>
@@ -928,11 +942,6 @@ public void processElement(ProcessContext c) {
}
}
- private static boolean isPointDelete(Mutation m) {
- return m.getOperation() == Mutation.Op.DELETE &&
Iterables.isEmpty(m.getKeySet().getRanges())
- && Iterables.size(m.getKeySet().getKeys()) == 1;
- }
-
/**
* Assigns a partition to the mutation group token based on the sampled data.
*/
@@ -974,26 +983,34 @@ public AssignPartitionFn(PCollectionView<Map<String,
List<byte[]>>> sampleView)
extends DoFn<KV<String, Iterable<SerializedMutation>>,
Iterable<MutationGroup>> {
private final long maxBatchSizeBytes;
+ private final long maxNumMutations;
private final SpannerConfig spannerConfig;
private final PCollectionView<SpannerSchema> schemaView;
private transient SpannerAccessor spannerAccessor;
// Current batch of mutations to be written.
- private List<MutationGroup> mutations;
+ private transient ImmutableList.Builder<MutationGroup> batch;
// total size of the current batch.
private long batchSizeBytes;
+ // total number of mutated cells including indices.
+ private long batchCells;
- private BatchFn(long maxBatchSizeBytes, SpannerConfig spannerConfig,
+ private BatchFn(
+ long maxBatchSizeBytes,
+ long maxNumMutations,
+ SpannerConfig spannerConfig,
PCollectionView<SpannerSchema> schemaView) {
this.maxBatchSizeBytes = maxBatchSizeBytes;
+ this.maxNumMutations = maxNumMutations;
this.spannerConfig = spannerConfig;
this.schemaView = schemaView;
}
@Setup
public void setup() {
- mutations = new ArrayList<>();
+ batch = ImmutableList.builder();
batchSizeBytes = 0;
+ batchCells = 0;
spannerAccessor = spannerConfig.connectToSpanner();
}
@@ -1004,24 +1021,35 @@ public void teardown() {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
- MutationGroupEncoder mutationGroupEncoder = new
MutationGroupEncoder(c.sideInput(schemaView));
+ SpannerSchema spannerSchema = c.sideInput(schemaView);
+ MutationGroupEncoder mutationGroupEncoder = new
MutationGroupEncoder(spannerSchema);
+
KV<String, Iterable<SerializedMutation>> element = c.element();
for (SerializedMutation kv : element.getValue()) {
byte[] value = kv.getMutationGroupBytes();
MutationGroup mg = mutationGroupEncoder.decode(value);
- mutations.add(mg);
- batchSizeBytes += MutationSizeEstimator.sizeOf(mg);
- if (batchSizeBytes >= maxBatchSizeBytes || mutations.size() >
MAX_NUM_MUTATIONS) {
+ long groupSize = MutationSizeEstimator.sizeOf(mg);
+ long groupCells = MutationCellCounter.countOf(spannerSchema, mg);
+ if (batchCells + groupCells > maxNumMutations
+ || batchSizeBytes + groupSize > maxBatchSizeBytes) {
+ ImmutableList<MutationGroup> mutations = batch.build();
c.output(mutations);
- mutations = new ArrayList<>();
+ batch = ImmutableList.builder();
batchSizeBytes = 0;
+ batchCells = 0;
}
+ batch.add(mg);
+ batchSizeBytes += groupSize;
+ batchCells += groupCells;
}
+ ImmutableList<MutationGroup> mutations = batch.build();
if (!mutations.isEmpty()) {
c.output(mutations);
- mutations = new ArrayList<>();
+ batch = ImmutableList.builder();
batchSizeBytes = 0;
+ batchCells = 0;
}
+
}
}
@@ -1083,5 +1111,5 @@ public void processElement(ProcessContext c) throws
Exception {
}
- private SpannerIO() {} // Prevent construction.
+ private SpannerIO() {} // Prevent construction.
}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerSchema.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerSchema.java
index 96b1826091c..d6bb866d440 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerSchema.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerSchema.java
@@ -19,68 +19,106 @@
import com.google.auto.value.AutoValue;
import com.google.cloud.spanner.Type;
-import com.google.common.base.Objects;
-import com.google.common.collect.ArrayListMultimap;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableListMultimap;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableTable;
+import com.google.common.collect.Maps;
import java.io.Serializable;
-import java.util.ArrayList;
import java.util.List;
/**
* Encapsulates Cloud Spanner Schema.
*/
-class SpannerSchema implements Serializable {
- private final List<String> tables;
- private final ArrayListMultimap<String, Column> columns;
- private final ArrayListMultimap<String, KeyPart> keyParts;
+@AutoValue
+abstract class SpannerSchema implements Serializable {
+ abstract ImmutableList<String> tables();
+ abstract ImmutableListMultimap<String, Column> columns();
+ abstract ImmutableListMultimap<String, KeyPart> keyParts();
+ abstract ImmutableTable<String, String, Long> cellsMutatedPerColumn();
+ abstract ImmutableMap<String, Long> cellsMutatedPerRow();
public static Builder builder() {
- return new Builder();
+ return new AutoValue_SpannerSchema.Builder();
}
/**
* Builder for {@link SpannerSchema}.
*/
- static class Builder {
- private final ArrayListMultimap<String, Column> columns =
ArrayListMultimap.create();
- private final ArrayListMultimap<String, KeyPart> keyParts =
ArrayListMultimap.create();
+ @AutoValue.Builder
+ abstract static class Builder {
+ abstract Builder setTables(ImmutableList<String> tablesBuilder);
+ abstract ImmutableListMultimap.Builder<String, Column> columnsBuilder();
+ abstract ImmutableListMultimap.Builder<String, KeyPart> keyPartsBuilder();
+ abstract ImmutableTable.Builder<String, String, Long>
cellsMutatedPerColumnBuilder();
+ abstract ImmutableMap.Builder<String, Long> cellsMutatedPerRowBuilder();
+
+ abstract ImmutableListMultimap<String, Column> columns();
+ abstract ImmutableTable<String, String, Long> cellsMutatedPerColumn();
+
+ @VisibleForTesting
public Builder addColumn(String table, String name, String type) {
- addColumn(table, Column.create(name.toLowerCase(), type));
- return this;
+ return addColumn(table, name, type, 1L);
}
- private Builder addColumn(String table, Column column) {
- columns.put(table.toLowerCase(), column);
+ public Builder addColumn(String table, String name, String type, long
cellsMutated) {
+ String tableLower = table.toLowerCase();
+ String nameLower = name.toLowerCase();
+
+ columnsBuilder().put(tableLower, Column.create(nameLower, type));
+ cellsMutatedPerColumnBuilder().put(tableLower, nameLower, cellsMutated);
return this;
}
public Builder addKeyPart(String table, String column, boolean desc) {
- keyParts.put(table.toLowerCase(), KeyPart.create(column.toLowerCase(),
desc));
+ keyPartsBuilder().put(table.toLowerCase(),
KeyPart.create(column.toLowerCase(), desc));
return this;
}
- public SpannerSchema build() {
- return new SpannerSchema(columns, keyParts);
- }
- }
+ abstract SpannerSchema autoBuild();
- private SpannerSchema(ArrayListMultimap<String, Column> columns,
- ArrayListMultimap<String, KeyPart> keyParts) {
- this.columns = columns;
- this.keyParts = keyParts;
- tables = new ArrayList<>(columns.keySet());
+ public final SpannerSchema build() {
+ // precompute the number of cells that are mutated for operations
affecting
+ // an entire row such as a single key delete.
+ cellsMutatedPerRowBuilder().putAll(Maps.transformValues(
+ cellsMutatedPerColumn().rowMap(),
+ entry -> entry.values().stream().mapToLong(Long::longValue).sum()));
+
+ setTables(ImmutableList.copyOf(columns().keySet()));
+
+ return autoBuild();
+ }
}
public List<String> getTables() {
- return tables;
+ return tables();
}
public List<Column> getColumns(String table) {
- return columns.get(table.toLowerCase());
+ return columns().get(table.toLowerCase());
}
public List<KeyPart> getKeyParts(String table) {
- return keyParts.get(table.toLowerCase());
+ return keyParts().get(table.toLowerCase());
+ }
+
+ /**
+ * Return the total number of cells affected when the specified column is
mutated.
+ */
+ public long getCellsMutatedPerColumn(String table, String column) {
+ return cellsMutatedPerColumn()
+ .row(table.toLowerCase())
+ .getOrDefault(column.toLowerCase(), 1L);
+ }
+
+ /**
+ * Return the total number of cells affected with the given row is deleted.
+ */
+ public long getCellsMutatedPerRow(String table) {
+ return cellsMutatedPerRow()
+ .getOrDefault(table.toLowerCase(), 1L);
}
@AutoValue
@@ -142,22 +180,4 @@ private static Type parseSpannerType(String spannerType) {
throw new IllegalArgumentException("Unknown spanner type " +
spannerType);
}
}
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- SpannerSchema that = (SpannerSchema) o;
- return Objects.equal(tables, that.tables) && Objects.equal(columns,
that.columns) && Objects
- .equal(keyParts, that.keyParts);
- }
-
- @Override
- public int hashCode() {
- return Objects.hashCode(tables, columns, keyParts);
- }
}
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/ReadSpannerSchemaTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/ReadSpannerSchemaTest.java
index 25dc6dcf406..1e516842b9b 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/ReadSpannerSchemaTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/ReadSpannerSchemaTest.java
@@ -53,7 +53,9 @@
private static Struct columnMetadata(String tableName, String columnName,
String type) {
return Struct.newBuilder().add("table_name", Value.string(tableName))
- .add("column_name", Value.string(columnName)).add("spanner_type",
Value.string(type))
+ .add("column_name", Value.string(columnName))
+ .add("spanner_type", Value.string(type))
+ .add("cells_mutated", Value.int64(3L))
.build();
}
@@ -66,7 +68,8 @@ private static Struct pkMetadata(String tableName, String
columnName, String ord
private void prepareColumnMetadata(ReadOnlyTransaction tx, List<Struct>
rows) {
Type type = Type.struct(Type.StructField.of("table_name", Type.string()),
Type.StructField.of("column_name", Type.string()),
- Type.StructField.of("spanner_type", Type.string()));
+ Type.StructField.of("spanner_type", Type.string()),
+ Type.StructField.of("cells_mutated", Type.int64()));
when(tx.executeQuery(argThat(new ArgumentMatcher<Statement>() {
@Override public boolean matches(Object argument) {
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java
index a67adc67c1e..7719ab712a9 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java
@@ -71,6 +71,7 @@
*/
@RunWith(JUnit4.class)
public class SpannerIOWriteTest implements Serializable {
+ private static final long CELLS_PER_KEY = 7;
@Rule public transient TestPipeline pipeline = TestPipeline.create();
@Rule public transient ExpectedException thrown = ExpectedException.none();
@@ -85,12 +86,15 @@
// Simplest schema: a table with int64 key
preparePkMetadata(tx, Arrays.asList(pkMetadata("tEsT", "key", "ASC")));
- prepareColumnMetadata(tx, Arrays.asList(columnMetadata("tEsT", "key",
"INT64")));
+ prepareColumnMetadata(tx, Arrays.asList(columnMetadata("tEsT", "key",
"INT64", CELLS_PER_KEY)));
}
- private static Struct columnMetadata(String tableName, String columnName,
String type) {
+ private static Struct columnMetadata(
+ String tableName, String columnName, String type, long cellsMutated) {
return Struct.newBuilder().add("table_name", Value.string(tableName))
- .add("column_name", Value.string(columnName)).add("spanner_type",
Value.string(type))
+ .add("column_name", Value.string(columnName))
+ .add("spanner_type", Value.string(type))
+ .add("cells_mutated", Value.int64(cellsMutated))
.build();
}
@@ -103,7 +107,8 @@ private static Struct pkMetadata(String tableName, String
columnName, String ord
private void prepareColumnMetadata(ReadOnlyTransaction tx, List<Struct>
rows) {
Type type = Type.struct(Type.StructField.of("table_name", Type.string()),
Type.StructField.of("column_name", Type.string()),
- Type.StructField.of("spanner_type", Type.string()));
+ Type.StructField.of("spanner_type", Type.string()),
+ Type.StructField.of("cells_mutated", Type.int64()));
when(tx.executeQuery(argThat(new ArgumentMatcher<Statement>() {
@Override public boolean matches(Object argument) {
@@ -285,10 +290,9 @@ private void verifyBatches(Iterable<Mutation>... batches) {
@Test
@Category(NeedsRunner.class)
- public void batchingGroups() throws Exception {
-
- // Have a room to accumulate one more item.
- long batchSize = MutationSizeEstimator.sizeOf(g(m(1L))) + 1;
+ public void sizeBatchingGroups() throws Exception {
+ // Accumulate two items per batch.
+ long batchSize = MutationSizeEstimator.sizeOf(g(m(1L))) * 2;
PCollection<MutationGroup> mutations = pipeline.apply(Create.of(g(m(1L)),
g(m(2L)), g(m(3L))));
mutations.apply(SpannerIO.write()
@@ -309,6 +313,32 @@ public void batchingGroups() throws Exception {
.writeAtLeastOnce(iterableOfSize(1));
}
+ @Test
+ @Category(NeedsRunner.class)
+ public void cellBatchingGroups() throws Exception {
+ // Accumulate two items per batch.
+ long maxNumMutations = CELLS_PER_KEY * 2;
+
+ PCollection<MutationGroup> mutations = pipeline.apply(Create.of(g(m(1L)),
g(m(2L)), g(m(3L))));
+ mutations.apply(SpannerIO.write()
+ .withProjectId("test-project")
+ .withInstanceId("test-instance")
+ .withDatabaseId("test-database")
+ .withServiceFactory(serviceFactory)
+ .withMaxNumMutations(maxNumMutations)
+ .withBatchSizeBytes(Integer.MAX_VALUE)
+ .withSampler(fakeSampler(m(1000L)))
+ .grouped());
+
+ pipeline.run();
+
+ // The content of batches is not deterministic. Just verify that the size
is correct.
+ verify(serviceFactory.mockDatabaseClient(), times(1))
+ .writeAtLeastOnce(iterableOfSize(2));
+ verify(serviceFactory.mockDatabaseClient(), times(1))
+ .writeAtLeastOnce(iterableOfSize(1));
+ }
+
@Test
@Category(NeedsRunner.class)
public void noBatching() throws Exception {
@@ -499,7 +529,7 @@ public void describeTo(Description description) {
private static FakeSampler fakeSampler(Mutation... mutations) {
SpannerSchema.Builder schema = SpannerSchema.builder();
- schema.addColumn("test", "key", "INT64");
+ schema.addColumn("test", "key", "INT64", CELLS_PER_KEY);
schema.addKeyPart("test", "key", false);
return new FakeSampler(schema.build(), Arrays.asList(mutations));
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 100891)
Time Spent: 5h (was: 4h 50m)
> SpannerWriteGroupFn does not respect mutation limits
> ----------------------------------------------------
>
> Key: BEAM-3516
> URL: https://issues.apache.org/jira/browse/BEAM-3516
> Project: Beam
> Issue Type: Bug
> Components: runner-dataflow
> Affects Versions: 2.2.0
> Reporter: Ryan Gordon
> Assignee: Mairbek Khadikov
> Priority: Major
> Time Spent: 5h
> Remaining Estimate: 0h
>
> When using SpannerIO.write(), if it happens to be a large batch or a table
> with indexes its very possible it can hit the Spanner Mutations Limitation
> and fail with the following error:
> {quote}Jan 02, 2018 2:42:59 PM
> org.apache.beam.runners.dataflow.util.MonitoringUtil$LoggingHandler process
> SEVERE: 2018-01-02T22:42:57.873Z: (3e7c871d215e890b):
> com.google.cloud.spanner.SpannerException: INVALID_ARGUMENT:
> io.grpc.StatusRuntimeException: INVALID_ARGUMENT: The transaction contains
> too many mutations. Insert and update operations count with the multiplicity
> of the number of columns they affect. For example, inserting values into one
> key column and four non-key columns count as five mutations total for the
> insert. Delete and delete range operations count as one mutation regardless
> of the number of columns affected. The total mutation count includes any
> changes to indexes that the transaction generates. Please reduce the number
> of writes, or use fewer indexes. (Maximum number: 20000)
> links {
> description: "Cloud Spanner limits documentation."
> url: "https://cloud.google.com/spanner/docs/limits"
> }
> at
> com.google.cloud.spanner.SpannerExceptionFactory.newSpannerExceptionPreformatted(SpannerExceptionFactory.java:119)
> at
> com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException(SpannerExceptionFactory.java:43)
> at
> com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException(SpannerExceptionFactory.java:80)
> at
> com.google.cloud.spanner.spi.v1.GrpcSpannerRpc.get(GrpcSpannerRpc.java:404)
> at
> com.google.cloud.spanner.spi.v1.GrpcSpannerRpc.commit(GrpcSpannerRpc.java:376)
> at
> com.google.cloud.spanner.SpannerImpl$SessionImpl$2.call(SpannerImpl.java:729)
> at
> com.google.cloud.spanner.SpannerImpl$SessionImpl$2.call(SpannerImpl.java:726)
> at com.google.cloud.spanner.SpannerImpl.runWithRetries(SpannerImpl.java:200)
> at
> com.google.cloud.spanner.SpannerImpl$SessionImpl.writeAtLeastOnce(SpannerImpl.java:725)
> at
> com.google.cloud.spanner.SessionPool$PooledSession.writeAtLeastOnce(SessionPool.java:248)
> at
> com.google.cloud.spanner.DatabaseClientImpl.writeAtLeastOnce(DatabaseClientImpl.java:37)
> at
> org.apache.beam.sdk.io.gcp.spanner.SpannerWriteGroupFn.flushBatch(SpannerWriteGroupFn.java:108)
> at
> org.apache.beam.sdk.io.gcp.spanner.SpannerWriteGroupFn.processElement(SpannerWriteGroupFn.java:79)
> {quote}
>
> As a workaround we can override the "withBatchSizeBytes" to something much
> smaller:
> {quote}mutations.apply("Write", SpannerIO
> .write()
> // Artificially reduce the max batch size b/c the batcher currently doesn't
> // take into account the 20000 mutation multiplicity limit
> .withBatchSizeBytes(1024) // 1KB
> .withProjectId("#PROJECTID#")
> .withInstanceId("#INSTANCE#")
> .withDatabaseId("#DATABASE#")
> );
> {quote}
> While this is not as efficient, it at least allows it to work consistently
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)