This is an automated email from the ASF dual-hosted git repository. lcwik pushed a commit to branch revert-10418-cachesizebytes in repository https://gitbox.apache.org/repos/asf/beam.git
commit 6663da88e992dc02abbedc7207259b46c8c06884 Author: Lukasz Cwik <[email protected]> AuthorDate: Tue Apr 21 13:06:17 2020 -0700 Revert "[BEAM-9014] CachingShuffleBatchReader use bytes to limit cache size." --- .../worker/ApplianceShuffleEntryReader.java | 6 +-- .../worker/ChunkingShuffleBatchReader.java | 8 +-- .../common/worker/CachingShuffleBatchReader.java | 61 ++++------------------ .../util/common/worker/ShuffleBatchReader.java | 5 +- .../worker/BatchingShuffleEntryReaderTest.java | 18 +++---- .../worker/CachingShuffleBatchReaderTest.java | 4 +- 6 files changed, 26 insertions(+), 76 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ApplianceShuffleEntryReader.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ApplianceShuffleEntryReader.java index d184364..71228c5 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ApplianceShuffleEntryReader.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ApplianceShuffleEntryReader.java @@ -44,9 +44,9 @@ public class ApplianceShuffleEntryReader implements ShuffleEntryReader { new ChunkingShuffleBatchReader(executionContext, operationContext, applianceShuffleReader); if (cache) { - // Limit the size of the cache to ~32 full shuffle batches. - final long maxBytes = 128L * 1024 * 1024; - batchReader = new CachingShuffleBatchReader(batchReader, maxBytes); + // Limit the size of the cache. + final int maxBatches = 32; + batchReader = new CachingShuffleBatchReader(batchReader, maxBatches); } entryReader = new BatchingShuffleEntryReader(batchReader); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ChunkingShuffleBatchReader.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ChunkingShuffleBatchReader.java index 8939fa7..7f25c81 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ChunkingShuffleBatchReader.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ChunkingShuffleBatchReader.java @@ -59,18 +59,14 @@ final class ChunkingShuffleBatchReader implements ShuffleBatchReader { } DataInputStream input = new DataInputStream(new ByteArrayInputStream(result.chunk)); ArrayList<ShuffleEntry> entries = new ArrayList<>(); - long batchSize = 0; while (input.available() > 0) { - ShuffleEntry entry = getShuffleEntry(input); - batchSize += entry.length(); - entries.add(entry); + entries.add(getShuffleEntry(input)); } return new Batch( entries, result.nextStartPosition == null ? null - : ByteArrayShufflePosition.of(result.nextStartPosition), - batchSize); + : ByteArrayShufflePosition.of(result.nextStartPosition)); } /** diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/CachingShuffleBatchReader.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/CachingShuffleBatchReader.java index c769881..fc87bc4 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/CachingShuffleBatchReader.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/CachingShuffleBatchReader.java @@ -18,8 +18,8 @@ package org.apache.beam.runners.dataflow.worker.util.common.worker; import java.io.IOException; -import java.time.Duration; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Objects; @@ -27,41 +27,26 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Throwables; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheLoader; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LoadingCache; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.Weigher; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Ints; /** A {@link ShuffleBatchReader} that caches batches as they're read. */ public class CachingShuffleBatchReader implements ShuffleBatchReader { private final ShuffleBatchReader reader; @VisibleForTesting final LoadingCache<BatchRange, Batch> cache; - /** - * Limit the size of the cache to 1GiB of batches. - * - * <p>If this increases beyond Integer.MAX_VALUE then {@link BatchWeigher} must be updated. - * Because a batch may be larger than 1GiB, the actual in-memory batch size may exceed this value. - */ - private static final int MAXIMUM_WEIGHT = 1024 * 1024 * 1024; + /** Limit the size of the cache to 1000 batches. */ + private static final int MAXIMUM_BATCHES = 1000; // Ensure that batches in the cache are expired quickly // for improved GC performance. - private static final Duration EXPIRE_AFTER = Duration.ofMillis(250); + private static final long EXPIRE_AFTER_MS = 250; - /** - * Creates the caching reader. - * - * @param shuffleReader wrapped reader. - * @param maximumWeightBytes maximum bytes for the cache. - * @param expireAfterAccess cache items may be evicted after the elapsed duration. - */ public CachingShuffleBatchReader( - ShuffleBatchReader shuffleReader, long maximumWeightBytes, Duration expireAfterAccess) { + ShuffleBatchReader shuffleReader, int maximumBatches, long expireAfterAccessMillis) { this.reader = shuffleReader; this.cache = CacheBuilder.newBuilder() - .maximumWeight(maximumWeightBytes) - .weigher(new BatchWeigher()) - .expireAfterAccess(expireAfterAccess) + .maximumSize(maximumBatches) + .expireAfterAccess(expireAfterAccessMillis, TimeUnit.MILLISECONDS) .<BatchRange, Batch>build( new CacheLoader<BatchRange, Batch>() { @Override @@ -73,24 +58,12 @@ public class CachingShuffleBatchReader implements ShuffleBatchReader { }); } - /** - * Creates the caching reader with a maximum size of {@link MAXIMUM_WEIGHT} and an element expiry - * duration of {@link EXPIRE_AFTER}. - * - * @param shuffleReader wrapped reader. - */ public CachingShuffleBatchReader(ShuffleBatchReader shuffleReader) { - this(shuffleReader, MAXIMUM_WEIGHT, EXPIRE_AFTER); + this(shuffleReader, MAXIMUM_BATCHES, EXPIRE_AFTER_MS); } - /** - * Creates the caching reader with an element expiry duration of {@link EXPIRE_AFTER}. - * - * @param shuffleReader wrapped reader. - * @param maximumWeightBytes maximum bytes for the cache. - */ - public CachingShuffleBatchReader(ShuffleBatchReader shuffleReader, long maximumWeightBytes) { - this(shuffleReader, maximumWeightBytes, EXPIRE_AFTER); + public CachingShuffleBatchReader(ShuffleBatchReader shuffleReader, int maximumBatches) { + this(shuffleReader, maximumBatches, EXPIRE_AFTER_MS); } @Override @@ -129,18 +102,4 @@ public class CachingShuffleBatchReader implements ShuffleBatchReader { return Objects.hashCode(startPosition, endPosition); } } - - /** - * Returns the weight of a Batch, in bytes, within the range [0, Integer.MAX_VALUE]. - * - * <p>The cache holds {@link MAX_WEIGHT} bytes. If {@link MAX_WEIGHT} is increased beyond - * Integer.MAX_VALUE bytes, a new weighing heuristic will be required to avoid under representing - * the number of bytes in memory. - */ - static final class BatchWeigher implements Weigher<BatchRange, Batch> { - @Override - public int weigh(BatchRange key, Batch value) { - return Ints.saturatedCast(value.bytes); - } - } } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ShuffleBatchReader.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ShuffleBatchReader.java index 29890c8..d1676eb 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ShuffleBatchReader.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ShuffleBatchReader.java @@ -30,13 +30,10 @@ public interface ShuffleBatchReader { public static class Batch { public final List<ShuffleEntry> entries; @Nullable public final ShufflePosition nextStartPosition; - public final long bytes; - public Batch( - List<ShuffleEntry> entries, @Nullable ShufflePosition nextStartPosition, long bytes) { + public Batch(List<ShuffleEntry> entries, @Nullable ShufflePosition nextStartPosition) { this.entries = entries; this.nextStartPosition = nextStartPosition; - this.bytes = bytes; } } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/BatchingShuffleEntryReaderTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/BatchingShuffleEntryReaderTest.java index faa4b05..9ee670b 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/BatchingShuffleEntryReaderTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/BatchingShuffleEntryReaderTest.java @@ -18,9 +18,9 @@ package org.apache.beam.runners.dataflow.worker.util.common.worker; import static com.google.api.client.util.Lists.newArrayList; -import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; @@ -68,9 +68,8 @@ public final class BatchingShuffleEntryReaderTest { ArrayList<ShuffleEntry> entries = new ArrayList<>(); entries.add(e1); entries.add(e2); - long batchSize = (long) e1.length() + e2.length(); when(batchReader.read(START_POSITION, END_POSITION)) - .thenReturn(new ShuffleBatchReader.Batch(entries, null, batchSize)); + .thenReturn(new ShuffleBatchReader.Batch(entries, null)); List<ShuffleEntry> results = newArrayList(reader.read(START_POSITION, END_POSITION)); assertThat(results, contains(e1, e2)); } @@ -82,9 +81,8 @@ public final class BatchingShuffleEntryReaderTest { ArrayList<ShuffleEntry> entries = new ArrayList<>(); entries.add(e1); entries.add(e2); - long batchSize = (long) e1.length() + e2.length(); when(batchReader.read(START_POSITION, END_POSITION)) - .thenReturn(new ShuffleBatchReader.Batch(entries, null, batchSize)); + .thenReturn(new ShuffleBatchReader.Batch(entries, null)); Reiterator<ShuffleEntry> it = reader.read(START_POSITION, END_POSITION); assertThat(it.hasNext(), equalTo(Boolean.TRUE)); assertThat(it.next(), equalTo(e1)); @@ -104,9 +102,9 @@ public final class BatchingShuffleEntryReaderTest { ShuffleEntry e2 = new ShuffleEntry(KEY, SKEY, VALUE); List<ShuffleEntry> e2s = Collections.singletonList(e2); when(batchReader.read(START_POSITION, END_POSITION)) - .thenReturn(new ShuffleBatchReader.Batch(e1s, NEXT_START_POSITION, e1.length())); + .thenReturn(new ShuffleBatchReader.Batch(e1s, NEXT_START_POSITION)); when(batchReader.read(NEXT_START_POSITION, END_POSITION)) - .thenReturn(new ShuffleBatchReader.Batch(e2s, null, e2.length())); + .thenReturn(new ShuffleBatchReader.Batch(e2s, null)); List<ShuffleEntry> results = newArrayList(reader.read(START_POSITION, END_POSITION)); assertThat(results, contains(e1, e2)); @@ -122,11 +120,11 @@ public final class BatchingShuffleEntryReaderTest { ShuffleEntry e3 = new ShuffleEntry(KEY, SKEY, VALUE); List<ShuffleEntry> e3s = Collections.singletonList(e3); when(batchReader.read(START_POSITION, END_POSITION)) - .thenReturn(new ShuffleBatchReader.Batch(e1s, NEXT_START_POSITION, 0)); + .thenReturn(new ShuffleBatchReader.Batch(e1s, NEXT_START_POSITION)); when(batchReader.read(NEXT_START_POSITION, END_POSITION)) - .thenReturn(new ShuffleBatchReader.Batch(e2s, SECOND_NEXT_START_POSITION, 0)); + .thenReturn(new ShuffleBatchReader.Batch(e2s, SECOND_NEXT_START_POSITION)); when(batchReader.read(SECOND_NEXT_START_POSITION, END_POSITION)) - .thenReturn(new ShuffleBatchReader.Batch(e3s, null, e3.length())); + .thenReturn(new ShuffleBatchReader.Batch(e3s, null)); List<ShuffleEntry> results = newArrayList(reader.read(START_POSITION, END_POSITION)); assertThat(results, contains(e3)); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/CachingShuffleBatchReaderTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/CachingShuffleBatchReaderTest.java index 27dbc1d..88db3b2 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/CachingShuffleBatchReaderTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/CachingShuffleBatchReaderTest.java @@ -17,9 +17,9 @@ */ package org.apache.beam.runners.dataflow.worker.util.common.worker; -import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.notNullValue; +import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -37,7 +37,7 @@ import org.junit.runners.JUnit4; @RunWith(JUnit4.class) public final class CachingShuffleBatchReaderTest { private final ShuffleBatchReader.Batch testBatch = - new ShuffleBatchReader.Batch(new ArrayList<ShuffleEntry>(), null, 0); + new ShuffleBatchReader.Batch(new ArrayList<ShuffleEntry>(), null); @Test public void readerShouldCacheReads() throws IOException {
