Repository: beam
Updated Branches:
  refs/heads/master 3dc454a9b -> 9ed0af8f2


[BEAM-2439] Dynamic sizing of Datastore write RPCs

This stops the Datastore connector from always sending 500 entities per RPC.
Instead, it starts at a lower number which is more likely to complete within
the deadline even in adverse conditions, and then increases or reduces the
batch size in response to measured latency of past requests.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0292a24f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0292a24f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0292a24f

Branch: refs/heads/master
Commit: 0292a24f9c88796542bff55031d84c11f0ab6b16
Parents: 3dc454a
Author: Colin Phipps <[email protected]>
Authored: Mon May 15 14:18:16 2017 +0000
Committer: Luke Cwik <[email protected]>
Committed: Thu Jun 22 13:33:52 2017 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/datastore/DatastoreV1.java  | 124 ++++++++++++++++---
 .../sdk/io/gcp/datastore/MovingAverage.java     |  50 ++++++++
 .../sdk/io/gcp/datastore/DatastoreV1Test.java   |  72 ++++++++++-
 3 files changed, 225 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/0292a24f/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
index 06b9c8a..e67f4b2 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
@@ -201,11 +201,31 @@ public class DatastoreV1 {
   DatastoreV1() {}
 
   /**
-   * Cloud Datastore has a limit of 500 mutations per batch operation, so we 
flush
-   * changes to Datastore every 500 entities.
+   * The number of entity updates written per RPC, initially. We buffer 
updates in the connector and
+   * write a batch to Datastore once we have collected a certain number. This 
is the initial batch
+   * size; it is adjusted at runtime based on the performance of previous 
writes (see {@link
+   * DatastoreV1.WriteBatcher}).
+   *
+   * <p>Testing has found that a batch of 200 entities will generally finish 
within the timeout even
+   * in adverse conditions.
+   */
+  @VisibleForTesting
+  static final int DATASTORE_BATCH_UPDATE_ENTITIES_START = 200;
+
+  /**
+   * When choosing the number of updates in a single RPC, never exceed the 
maximum allowed by the
+   * API.
    */
   @VisibleForTesting
-  static final int DATASTORE_BATCH_UPDATE_LIMIT = 500;
+  static final int DATASTORE_BATCH_UPDATE_ENTITIES_LIMIT = 500;
+
+  /**
+   * When choosing the number of updates in a single RPC, do not go below this 
value.  The actual
+   * number of entities per request may be lower when we flush for the end of 
a bundle or if we hit
+   * {@link DatastoreV1.DATASTORE_BATCH_UPDATE_BYTES_LIMIT}.
+   */
+  @VisibleForTesting
+  static final int DATASTORE_BATCH_UPDATE_ENTITIES_MIN = 10;
 
   /**
    * Cloud Datastore has a limit of 10MB per RPC, so we also flush if the 
total size of mutations
@@ -1107,18 +1127,74 @@ public class DatastoreV1 {
     }
   }
 
+  /** Determines batch sizes for commit RPCs. */
+  @VisibleForTesting
+  interface WriteBatcher {
+    /** Call before using this WriteBatcher. */
+    void start();
+
+    /**
+     * Reports the latency of a previous commit RPC, and the number of 
mutations that it contained.
+     */
+    void addRequestLatency(long timeSinceEpochMillis, long latencyMillis, int 
numMutations);
+
+    /** Returns the number of entities to include in the next CommitRequest. */
+    int nextBatchSize(long timeSinceEpochMillis);
+  }
+
+  /**
+   * Determines batch sizes for commit RPCs based on past performance.
+   *
+   * <p>It aims for a target response time per RPC: it uses the response times 
for previous RPCs
+   * and the number of entities contained in them, calculates a rolling 
average time-per-entity, and
+   * chooses the number of entities for future writes to hit the target time.
+   *
+   * <p>This enables us to send large batches without sending over-large 
requests in the case of
+   * expensive entity writes that may timeout before the server can apply them 
all.
+   */
+  @VisibleForTesting
+  static class WriteBatcherImpl implements WriteBatcher, Serializable {
+    /** Target time per RPC for writes. */
+    static final int DATASTORE_BATCH_TARGET_LATENCY_MS = 5000;
+
+    @Override
+    public void start() {
+      meanLatencyPerEntityMs = new MovingAverage(
+          120000 /* sample period 2 minutes */, 10000 /* sample interval 10s 
*/,
+          1 /* numSignificantBuckets */, 1 /* numSignificantSamples */);
+    }
+
+    @Override
+    public void addRequestLatency(long timeSinceEpochMillis, long 
latencyMillis, int numMutations) {
+      meanLatencyPerEntityMs.add(timeSinceEpochMillis, latencyMillis / 
numMutations);
+    }
+
+    @Override
+    public int nextBatchSize(long timeSinceEpochMillis) {
+      if (!meanLatencyPerEntityMs.hasValue(timeSinceEpochMillis)) {
+        return DATASTORE_BATCH_UPDATE_ENTITIES_START;
+      }
+      long recentMeanLatency = 
Math.max(meanLatencyPerEntityMs.get(timeSinceEpochMillis), 1);
+      return (int) Math.max(DATASTORE_BATCH_UPDATE_ENTITIES_MIN,
+          Math.min(DATASTORE_BATCH_UPDATE_ENTITIES_LIMIT,
+            DATASTORE_BATCH_TARGET_LATENCY_MS / recentMeanLatency));
+    }
+
+    private transient MovingAverage meanLatencyPerEntityMs;
+  }
+
   /**
    * {@link DoFn} that writes {@link Mutation}s to Cloud Datastore. Mutations 
are written in
-   * batches, where the maximum batch size is {@link 
DatastoreV1#DATASTORE_BATCH_UPDATE_LIMIT}.
+   * batches; see {@link DatastoreV1.WriteBatcherImpl}.
    *
    * <p>See <a
    * href="https://cloud.google.com/datastore/docs/concepts/entities";>
    * Datastore: Entities, Properties, and Keys</a> for information about 
entity keys and mutations.
    *
    * <p>Commits are non-transactional.  If a commit fails because of a 
conflict over an entity
-   * group, the commit will be retried (up to {@link 
DatastoreV1#DATASTORE_BATCH_UPDATE_LIMIT}
+   * group, the commit will be retried (up to {@link 
DatastoreV1.DatastoreWriterFn#MAX_RETRIES}
    * times). This means that the mutation operation should be idempotent. 
Thus, the writer should
-   * only be used for {code upsert} and {@code delete} mutation operations, as 
these are the only
+   * only be used for {@code upsert} and {@code delete} mutation operations, 
as these are the only
    * two Cloud Datastore mutations that are idempotent.
    */
   @VisibleForTesting
@@ -1132,6 +1208,7 @@ public class DatastoreV1 {
     // Current batch of mutations to be written.
     private final List<Mutation> mutations = new ArrayList<>();
     private int mutationsSize = 0;  // Accumulated size of protos in mutations.
+    private WriteBatcher writeBatcher;
 
     private static final int MAX_RETRIES = 5;
     private static final FluentBackoff BUNDLE_WRITE_BACKOFF =
@@ -1139,24 +1216,27 @@ public class DatastoreV1 {
             
.withMaxRetries(MAX_RETRIES).withInitialBackoff(Duration.standardSeconds(5));
 
     DatastoreWriterFn(String projectId, @Nullable String localhost) {
-      this(StaticValueProvider.of(projectId), localhost, new 
V1DatastoreFactory());
+      this(StaticValueProvider.of(projectId), localhost, new 
V1DatastoreFactory(),
+          new WriteBatcherImpl());
     }
 
     DatastoreWriterFn(ValueProvider<String> projectId, @Nullable String 
localhost) {
-      this(projectId, localhost, new V1DatastoreFactory());
+      this(projectId, localhost, new V1DatastoreFactory(), new 
WriteBatcherImpl());
     }
 
     @VisibleForTesting
     DatastoreWriterFn(ValueProvider<String> projectId, @Nullable String 
localhost,
-        V1DatastoreFactory datastoreFactory) {
+        V1DatastoreFactory datastoreFactory, WriteBatcher writeBatcher) {
       this.projectId = checkNotNull(projectId, "projectId");
       this.localhost = localhost;
       this.datastoreFactory = datastoreFactory;
+      this.writeBatcher = writeBatcher;
     }
 
     @StartBundle
     public void startBundle(StartBundleContext c) {
       datastore = datastoreFactory.getDatastore(c.getPipelineOptions(), 
projectId.get(), localhost);
+      writeBatcher.start();
     }
 
     @ProcessElement
@@ -1169,7 +1249,7 @@ public class DatastoreV1 {
       }
       mutations.add(c.element());
       mutationsSize += size;
-      if (mutations.size() >= DatastoreV1.DATASTORE_BATCH_UPDATE_LIMIT) {
+      if (mutations.size() >= 
writeBatcher.nextBatchSize(System.currentTimeMillis())) {
         flushBatch();
       }
     }
@@ -1199,18 +1279,32 @@ public class DatastoreV1 {
 
       while (true) {
         // Batch upsert entities.
+        CommitRequest.Builder commitRequest = CommitRequest.newBuilder();
+        commitRequest.addAllMutations(mutations);
+        commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL);
+        long startTime = System.currentTimeMillis(), endTime;
+
         try {
-          CommitRequest.Builder commitRequest = CommitRequest.newBuilder();
-          commitRequest.addAllMutations(mutations);
-          commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL);
           datastore.commit(commitRequest.build());
+          endTime = System.currentTimeMillis();
+
+          writeBatcher.addRequestLatency(endTime, endTime - startTime, 
mutations.size());
+
           // Break if the commit threw no exception.
           break;
         } catch (DatastoreException exception) {
+          if (exception.getCode() == Code.DEADLINE_EXCEEDED) {
+            /* Most errors are not related to request size, and should not 
change our expectation of
+             * the latency of successful requests. DEADLINE_EXCEEDED can be 
taken into
+             * consideration, though. */
+            endTime = System.currentTimeMillis();
+            writeBatcher.addRequestLatency(endTime, endTime - startTime, 
mutations.size());
+          }
+
           // Only log the code and message for potentially-transient errors. 
The entire exception
           // will be propagated upon the last retry.
-          LOG.error("Error writing to the Datastore ({}): {}", 
exception.getCode(),
-              exception.getMessage());
+          LOG.error("Error writing batch of {} mutations to Datastore ({}): 
{}", mutations.size(),
+              exception.getCode(), exception.getMessage());
           if (!BackOffUtils.next(sleeper, backoff)) {
             LOG.error("Aborting after {} retries.", MAX_RETRIES);
             throw exception;

http://git-wip-us.apache.org/repos/asf/beam/blob/0292a24f/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/MovingAverage.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/MovingAverage.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/MovingAverage.java
new file mode 100644
index 0000000..0890e79
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/MovingAverage.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.gcp.datastore;
+
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.util.MovingFunction;
+
+
+class MovingAverage {
+  private final MovingFunction sum;
+  private final MovingFunction count;
+
+  public MovingAverage(long samplePeriodMs, long sampleUpdateMs,
+                        int numSignificantBuckets, int numSignificantSamples) {
+    sum = new MovingFunction(samplePeriodMs, sampleUpdateMs,
+        numSignificantBuckets, numSignificantSamples, Sum.ofLongs());
+    count = new MovingFunction(samplePeriodMs, sampleUpdateMs,
+        numSignificantBuckets, numSignificantSamples, Sum.ofLongs());
+  }
+
+  public void add(long nowMsSinceEpoch, long value) {
+    sum.add(nowMsSinceEpoch, value);
+    count.add(nowMsSinceEpoch, 1);
+  }
+
+  public long get(long nowMsSinceEpoch) {
+    return sum.get(nowMsSinceEpoch) / count.get(nowMsSinceEpoch);
+  }
+
+  public boolean hasValue(long nowMsSinceEpoch) {
+    return sum.isSignificant() && count.isSignificant()
+      && count.get(nowMsSinceEpoch) > 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/0292a24f/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
index 229b1fb..946887c 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
@@ -27,7 +27,7 @@ import static 
com.google.datastore.v1.client.DatastoreHelper.makeOrder;
 import static com.google.datastore.v1.client.DatastoreHelper.makeUpsert;
 import static com.google.datastore.v1.client.DatastoreHelper.makeValue;
 import static 
org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.DATASTORE_BATCH_UPDATE_BYTES_LIMIT;
-import static 
org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.DATASTORE_BATCH_UPDATE_LIMIT;
+import static 
org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.DATASTORE_BATCH_UPDATE_ENTITIES_START;
 import static 
org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.Read.DEFAULT_BUNDLE_SIZE_BYTES;
 import static 
org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.Read.QUERY_BATCH_LIMIT;
 import static 
org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.Read.getEstimatedSizeBytes;
@@ -606,7 +606,7 @@ public class DatastoreV1Test {
   /** Tests {@link DatastoreWriterFn} with entities of more than one batches, 
but not a multiple. */
   @Test
   public void testDatatoreWriterFnWithMultipleBatches() throws Exception {
-    datastoreWriterFnTest(DATASTORE_BATCH_UPDATE_LIMIT * 3 + 100);
+    datastoreWriterFnTest(DATASTORE_BATCH_UPDATE_ENTITIES_START * 3 + 100);
   }
 
   /**
@@ -615,7 +615,7 @@ public class DatastoreV1Test {
    */
   @Test
   public void testDatatoreWriterFnWithBatchesExactMultiple() throws Exception {
-    datastoreWriterFnTest(DATASTORE_BATCH_UPDATE_LIMIT * 2);
+    datastoreWriterFnTest(DATASTORE_BATCH_UPDATE_ENTITIES_START * 2);
   }
 
   // A helper method to test DatastoreWriterFn for various batch sizes.
@@ -628,14 +628,14 @@ public class DatastoreV1Test {
     }
 
     DatastoreWriterFn datastoreWriter = new 
DatastoreWriterFn(StaticValueProvider.of(PROJECT_ID),
-        null, mockDatastoreFactory);
+        null, mockDatastoreFactory, new FakeWriteBatcher());
     DoFnTester<Mutation, Void> doFnTester = DoFnTester.of(datastoreWriter);
     doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
     doFnTester.processBundle(mutations);
 
     int start = 0;
     while (start < numMutations) {
-      int end = Math.min(numMutations, start + DATASTORE_BATCH_UPDATE_LIMIT);
+      int end = Math.min(numMutations, start + 
DATASTORE_BATCH_UPDATE_ENTITIES_START);
       CommitRequest.Builder commitRequest = CommitRequest.newBuilder();
       commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL);
       commitRequest.addAllMutations(mutations.subList(start, end));
@@ -662,7 +662,7 @@ public class DatastoreV1Test {
     }
 
     DatastoreWriterFn datastoreWriter = new 
DatastoreWriterFn(StaticValueProvider.of(PROJECT_ID),
-        null, mockDatastoreFactory);
+        null, mockDatastoreFactory, new FakeWriteBatcher());
     DoFnTester<Mutation, Void> doFnTester = DoFnTester.of(datastoreWriter);
     doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
     doFnTester.processBundle(mutations);
@@ -896,6 +896,50 @@ public class DatastoreV1Test {
         
.apply(DatastoreIO.v1().write().withProjectId(options.getDatastoreProject()));
   }
 
+  @Test
+  public void testWriteBatcherWithoutData() {
+    DatastoreV1.WriteBatcher writeBatcher = new DatastoreV1.WriteBatcherImpl();
+    writeBatcher.start();
+    assertEquals(DatastoreV1.DATASTORE_BATCH_UPDATE_ENTITIES_START, 
writeBatcher.nextBatchSize(0));
+  }
+
+  @Test
+  public void testWriteBatcherFastQueries() {
+    DatastoreV1.WriteBatcher writeBatcher = new DatastoreV1.WriteBatcherImpl();
+    writeBatcher.start();
+    writeBatcher.addRequestLatency(0, 1000, 200);
+    writeBatcher.addRequestLatency(0, 1000, 200);
+    assertEquals(DatastoreV1.DATASTORE_BATCH_UPDATE_ENTITIES_LIMIT, 
writeBatcher.nextBatchSize(0));
+  }
+
+  @Test
+  public void testWriteBatcherSlowQueries() {
+    DatastoreV1.WriteBatcher writeBatcher = new DatastoreV1.WriteBatcherImpl();
+    writeBatcher.start();
+    writeBatcher.addRequestLatency(0, 10000, 200);
+    writeBatcher.addRequestLatency(0, 10000, 200);
+    assertEquals(100, writeBatcher.nextBatchSize(0));
+  }
+
+  @Test
+  public void testWriteBatcherSizeNotBelowMinimum() {
+    DatastoreV1.WriteBatcher writeBatcher = new DatastoreV1.WriteBatcherImpl();
+    writeBatcher.start();
+    writeBatcher.addRequestLatency(0, 30000, 50);
+    writeBatcher.addRequestLatency(0, 30000, 50);
+    assertEquals(DatastoreV1.DATASTORE_BATCH_UPDATE_ENTITIES_MIN, 
writeBatcher.nextBatchSize(0));
+  }
+
+  @Test
+  public void testWriteBatcherSlidingWindow() {
+    DatastoreV1.WriteBatcher writeBatcher = new DatastoreV1.WriteBatcherImpl();
+    writeBatcher.start();
+    writeBatcher.addRequestLatency(0, 30000, 50);
+    writeBatcher.addRequestLatency(50000, 5000, 200);
+    writeBatcher.addRequestLatency(100000, 5000, 200);
+    assertEquals(200, writeBatcher.nextBatchSize(150000));
+  }
+
   /** Helper Methods */
 
   /** A helper function that verifies if all the queries have unique keys. */
@@ -1039,4 +1083,20 @@ public class DatastoreV1Test {
     }
     return queries;
   }
+
+  /**
+   * A WriteBatcher for unit tests, which does no timing-based adjustments (so 
unit tests have
+   * consistent results).
+   */
+  static class FakeWriteBatcher implements DatastoreV1.WriteBatcher {
+    @Override
+    public void start() {}
+    @Override
+    public void addRequestLatency(long timeSinceEpochMillis, long 
latencyMillis, int numMutations) {
+    }
+    @Override
+    public int nextBatchSize(long timeSinceEpochMillis) {
+      return DatastoreV1.DATASTORE_BATCH_UPDATE_ENTITIES_START;
+    }
+  }
 }

Reply via email to