This is an automated email from the ASF dual-hosted git repository.
zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 5c475d4 [FLINK-23354][blob] Limit the size of ShuffleDescriptors in
PermanentBlobCache on TaskExecutor
5c475d4 is described below
commit 5c475d41fea3c81557e0d463bed1c94024dd0da5
Author: Thesharing <[email protected]>
AuthorDate: Thu Jul 15 15:24:36 2021 +0800
[FLINK-23354][blob] Limit the size of ShuffleDescriptors in
PermanentBlobCache on TaskExecutor
This closes #16498.
---
.../flink/runtime/blob/BlobCacheSizeTracker.java | 204 +++++++++++++++++
.../flink/runtime/blob/PermanentBlobCache.java | 162 +++++++++++++
.../flink/runtime/blob/PermanentBlobService.java | 21 ++
.../deployment/InputGateDeploymentDescriptor.java | 5 +-
.../flink/runtime/blob/BlobCacheCleanupTest.java | 19 +-
.../runtime/blob/BlobCacheSizeTrackerTest.java | 183 +++++++++++++++
.../blob/PermanentBlobCacheSizeLimitTest.java | 255 +++++++++++++++++++++
...hDeploymentWithSmallBlobCacheSizeLimitTest.java | 194 ++++++++++++++++
8 files changed, 1036 insertions(+), 7 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCacheSizeTracker.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCacheSizeTracker.java
new file mode 100644
index 0000000..0484389
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCacheSizeTracker.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * BlobCacheSizeTracker uses {@link LinkedHashMap} to maintain the LRU order
for the files in the
+ * cache. When new files are intended to be put into cache, {@code checkLimit}
is called to query
+ * the files should be removed. This tracker maintains a lock to avoid
concurrent modification. To
+ * avoid the inconsistency, make sure that hold the READ/WRITE lock in {@link
PermanentBlobCache}
+ * first and then hold the lock here.
+ */
+public class BlobCacheSizeTracker {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(BlobCacheSizeTracker.class);
+
+ private static final int INITIAL_SIZE = 10_000;
+
+ private final Object lock = new Object();
+
+ protected final long sizeLimit;
+
+ @GuardedBy("lock")
+ private long total;
+
+ @GuardedBy("lock")
+ private final LinkedHashMap<Tuple2<JobID, BlobKey>, Long> caches;
+
+ @GuardedBy("lock")
+ private final HashMap<JobID, Set<BlobKey>> blobKeyByJob;
+
+ public BlobCacheSizeTracker(long sizeLimit) {
+ checkArgument(sizeLimit > 0);
+
+ this.sizeLimit = sizeLimit;
+ this.total = 0L;
+ this.caches = new LinkedHashMap<>(INITIAL_SIZE, 0.75F, true);
+ this.blobKeyByJob = new HashMap<>();
+ }
+
+ /**
+ * Check the size limit and return the BLOBs to delete.
+ *
+ * @param size size of the BLOB intended to put into the cache
+ * @return list of BLOBs to delete before putting into the target BLOB
+ */
+ public List<Tuple2<JobID, BlobKey>> checkLimit(long size) {
+ checkArgument(size >= 0);
+
+ synchronized (lock) {
+ List<Tuple2<JobID, BlobKey>> blobsToDelete = new ArrayList<>();
+
+ long current = total;
+
+ for (Map.Entry<Tuple2<JobID, BlobKey>, Long> entry :
caches.entrySet()) {
+ if (current + size > sizeLimit) {
+ blobsToDelete.add(entry.getKey());
+ current -= entry.getValue();
+ }
+ }
+
+ return blobsToDelete;
+ }
+ }
+
+ /** Register the BLOB to the tracker. */
+ public void track(JobID jobId, BlobKey blobKey, long size) {
+ checkNotNull(jobId);
+ checkNotNull(blobKey);
+ checkArgument(size >= 0);
+
+ synchronized (lock) {
+ if (caches.putIfAbsent(Tuple2.of(jobId, blobKey), size) == null) {
+ blobKeyByJob.computeIfAbsent(jobId, ignore -> new
HashSet<>()).add(blobKey);
+
+ total += size;
+ if (total > sizeLimit) {
+ LOG.warn(
+ "The overall size of BLOBs in the cache exceeds "
+ + "the limit. Limit = [{}], Current: [{}],
"
+ + "The size of next BLOB: [{}].",
+ sizeLimit,
+ total,
+ size);
+ }
+ } else {
+ LOG.warn(
+ "Attempt to track a duplicated BLOB. This may indicate
a duplicate upload "
+ + "or a hash collision. Ignoring newest
upload. "
+ + "JobID = [{}], BlobKey = [{}]",
+ jobId,
+ blobKey);
+ }
+ }
+ }
+
+ /** Remove the BLOB from the tracker. */
+ public void untrack(Tuple2<JobID, BlobKey> key) {
+ checkNotNull(key);
+ checkNotNull(key.f0);
+ checkNotNull(key.f1);
+
+ synchronized (lock) {
+ blobKeyByJob.computeIfAbsent(key.f0, ignore -> new
HashSet<>()).remove(key.f1);
+
+ Long size = caches.remove(key);
+ if (size != null) {
+ checkState(size >= 0);
+ total -= size;
+ }
+ }
+ }
+
+ /** Remove the BLOB from the tracker. */
+ private void untrack(JobID jobId, BlobKey blobKey) {
+ checkNotNull(jobId);
+ checkNotNull(blobKey);
+
+ untrack(Tuple2.of(jobId, blobKey));
+ }
+
+ /**
+ * Update the least used index for the BLOBs so that the tracker can
easily find out the least
+ * recently used BLOBs.
+ */
+ public void update(JobID jobId, BlobKey blobKey) {
+ checkNotNull(jobId);
+ checkNotNull(blobKey);
+
+ synchronized (lock) {
+ caches.get(Tuple2.of(jobId, blobKey));
+ }
+ }
+
+ /** Unregister all the tracked BLOBs related to the given job. */
+ public void untrackAll(JobID jobId) {
+ checkNotNull(jobId);
+
+ synchronized (lock) {
+ Set<BlobKey> keysToRemove = blobKeyByJob.remove(jobId);
+ if (keysToRemove != null) {
+ for (BlobKey key : keysToRemove) {
+ untrack(jobId, key);
+ }
+ }
+ }
+ }
+
+ @VisibleForTesting
+ Long getSize(JobID jobId, BlobKey blobKey) {
+ checkNotNull(jobId);
+ checkNotNull(blobKey);
+
+ synchronized (lock) {
+ return caches.get(Tuple2.of(jobId, blobKey));
+ }
+ }
+
+ @VisibleForTesting
+ Set<BlobKey> getBlobKeysByJobId(JobID jobId) {
+ checkNotNull(jobId);
+
+ synchronized (lock) {
+ return blobKeyByJob.getOrDefault(jobId, Collections.emptySet());
+ }
+ }
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobCache.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobCache.java
index 0f64667..f2238ef 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobCache.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobCache.java
@@ -20,10 +20,13 @@ package org.apache.flink.runtime.blob;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
import org.apache.flink.util.FileUtils;
+import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
@@ -33,6 +36,7 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
@@ -65,6 +69,8 @@ public class PermanentBlobCache extends AbstractBlobCache
implements PermanentBl
public long keepUntil = -1;
}
+ private static final int DEFAULT_SIZE_LIMIT_MB = 100;
+
/** Map to store the number of references to a specific job. */
private final Map<JobID, RefCount> jobRefCounters = new HashMap<>();
@@ -74,6 +80,8 @@ public class PermanentBlobCache extends AbstractBlobCache
implements PermanentBl
/** Timer task to execute the cleanup at regular intervals. */
private final Timer cleanupTimer;
+ private final BlobCacheSizeTracker blobCacheSizeTracker;
+
/**
* Instantiates a new cache for permanent BLOBs which are also available
in an HA store.
*
@@ -90,6 +98,21 @@ public class PermanentBlobCache extends AbstractBlobCache
implements PermanentBl
@Nullable final InetSocketAddress serverAddress)
throws IOException {
+ this(
+ blobClientConfig,
+ blobView,
+ serverAddress,
+ new
BlobCacheSizeTracker(MemorySize.ofMebiBytes(DEFAULT_SIZE_LIMIT_MB).getBytes()));
+ }
+
+ @VisibleForTesting
+ public PermanentBlobCache(
+ final Configuration blobClientConfig,
+ final BlobView blobView,
+ @Nullable final InetSocketAddress serverAddress,
+ BlobCacheSizeTracker blobCacheSizeTracker)
+ throws IOException {
+
super(
blobClientConfig,
blobView,
@@ -102,6 +125,8 @@ public class PermanentBlobCache extends AbstractBlobCache
implements PermanentBl
this.cleanupInterval =
blobClientConfig.getLong(BlobServerOptions.CLEANUP_INTERVAL) * 1000;
this.cleanupTimer.schedule(
new PermanentBlobCleanupTask(), cleanupInterval,
cleanupInterval);
+
+ this.blobCacheSizeTracker = blobCacheSizeTracker;
}
/**
@@ -188,6 +213,142 @@ public class PermanentBlobCache extends AbstractBlobCache
implements PermanentBl
}
/**
+ * Returns the content of the file for the BLOB with the provided job ID
the blob key.
+ *
+ * <p>The method will first attempt to serve the BLOB from the local
cache. If the BLOB is not
+ * in the cache, the method will try to download it from the HA store, or
directly from the
+ * {@link BlobServer}.
+ *
+ * <p>Compared to {@code getFile}, {@code readFile} makes sure that the
file is fully read in
+ * the same write lock as the file is accessed. This avoids the scenario
that the path is
+ * returned as the file is deleted concurrently by other threads.
+ *
+ * @param jobId ID of the job this blob belongs to
+ * @param blobKey BLOB key associated with the requested file
+ * @return The content of the BLOB.
+ * @throws java.io.FileNotFoundException if the BLOB does not exist;
+ * @throws IOException if any other error occurs when retrieving the file.
+ */
+ @Override
+ public byte[] readFile(JobID jobId, PermanentBlobKey blobKey) throws
IOException {
+ checkNotNull(jobId);
+ checkNotNull(blobKey);
+
+ final File localFile = BlobUtils.getStorageLocation(storageDir, jobId,
blobKey);
+ readWriteLock.readLock().lock();
+
+ try {
+ if (localFile.exists()) {
+ blobCacheSizeTracker.update(jobId, blobKey);
+ return FileUtils.readAllBytes(localFile.toPath());
+ }
+ } finally {
+ readWriteLock.readLock().unlock();
+ }
+
+ // first try the distributed blob store (if available)
+ // use a temporary file (thread-safe without locking)
+ File incomingFile = createTemporaryFilename();
+ try {
+ try {
+ if (blobView.get(jobId, blobKey, incomingFile)) {
+ // now move the temp file to our local cache atomically
+ readWriteLock.writeLock().lock();
+ try {
+ checkLimitAndMoveFile(incomingFile, jobId, blobKey,
localFile, log, null);
+ return FileUtils.readAllBytes(localFile.toPath());
+ } finally {
+ readWriteLock.writeLock().unlock();
+ }
+ }
+ } catch (Exception e) {
+ log.info(
+ "Failed to copy from blob store. Downloading from BLOB
server instead.", e);
+ }
+
+ final InetSocketAddress currentServerAddress = serverAddress;
+
+ if (currentServerAddress != null) {
+ // fallback: download from the BlobServer
+ BlobClient.downloadFromBlobServer(
+ jobId,
+ blobKey,
+ incomingFile,
+ currentServerAddress,
+ blobClientConfig,
+ numFetchRetries);
+
+ readWriteLock.writeLock().lock();
+ try {
+ checkLimitAndMoveFile(incomingFile, jobId, blobKey,
localFile, log, null);
+ return FileUtils.readAllBytes(localFile.toPath());
+ } finally {
+ readWriteLock.writeLock().unlock();
+ }
+ } else {
+ throw new IOException(
+ "Cannot download from BlobServer, because the server
address is unknown.");
+ }
+
+ } finally {
+ // delete incomingFile from a failed download
+ if (!incomingFile.delete() && incomingFile.exists()) {
+ log.warn(
+ "Could not delete the staging file {} for blob key {}
and job {}.",
+ incomingFile,
+ blobKey,
+ jobId);
+ }
+ }
+ }
+
+ private void checkLimitAndMoveFile(
+ File incomingFile,
+ JobID jobId,
+ BlobKey blobKey,
+ File localFile,
+ Logger log,
+ @Nullable BlobStore blobStore)
+ throws IOException {
+
+ // Check the size limit and delete the files that exceeds the limit
+ final long sizeOfIncomingFile = incomingFile.length();
+ final List<Tuple2<JobID, BlobKey>> blobsToDelete =
+ blobCacheSizeTracker.checkLimit(sizeOfIncomingFile);
+
+ for (Tuple2<JobID, BlobKey> key : blobsToDelete) {
+ if (deleteFile(key.f0, key.f1)) {
+ blobCacheSizeTracker.untrack(key);
+ }
+ }
+
+ // Move the file and register it to the tracker
+ BlobUtils.moveTempFileToStore(incomingFile, jobId, blobKey, localFile,
log, blobStore);
+ blobCacheSizeTracker.track(jobId, blobKey, localFile.length());
+ }
+
+ /**
+ * Delete the blob file with the given key.
+ *
+ * @param jobId ID of the job this blob belongs to (or <tt>null</tt> if
job-unrelated)
+ * @param blobKey The key of the desired BLOB.
+ */
+ private boolean deleteFile(JobID jobId, BlobKey blobKey) {
+ final File localFile =
+ new File(
+ BlobUtils.getStorageLocationPath(
+ storageDir.getAbsolutePath(), jobId, blobKey));
+ if (!localFile.delete() && localFile.exists()) {
+ log.warn(
+ "Failed to delete locally cached BLOB {} at {}",
+ blobKey,
+ localFile.getAbsolutePath());
+ return false;
+ }
+ return true;
+ }
+
+ /**
* Returns a file handle to the file associated with the given blob key on
the blob server.
*
* @param jobId ID of the job this blob belongs to (or <tt>null</tt> if
job-unrelated)
@@ -247,6 +408,7 @@ public class PermanentBlobCache extends AbstractBlobCache
implements PermanentBl
boolean success = false;
try {
+ blobCacheSizeTracker.untrackAll(jobId);
FileUtils.deleteDirectory(localFile);
success = true;
} catch (Throwable t) {
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobService.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobService.java
index 6fe10a3..b12458e 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobService.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobService.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.blob;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.util.FileUtils;
import java.io.Closeable;
import java.io.File;
@@ -45,4 +46,24 @@ public interface PermanentBlobService extends Closeable {
* @throws IOException if any other error occurs when retrieving the file
*/
File getFile(JobID jobId, PermanentBlobKey key) throws IOException;
+
+ /**
+ * Returns the content of the file for the BLOB with the provided job ID
the blob key.
+ *
+ * <p>Compared to {@code getFile}, {@code readFile} will attempt to read
the entire file after
+ * retrieving it. If file reading and file retrieving is done in the same
WRITE lock, it can
+ * avoid the scenario that the path to the file is deleted concurrently by
other threads when
+ * the file is retrieved but not read yet.
+ *
+ * @param jobId ID of the job this blob belongs to
+ * @param key BLOB key associated with the requested file
+ * @return The content of the BLOB.
+ * @throws java.io.FileNotFoundException if the BLOB does not exist;
+ * @throws IOException if any other error occurs when retrieving the file.
+ */
+ default byte[] readFile(JobID jobId, PermanentBlobKey key) throws
IOException {
+ // The default implementation doesn't guarantee that the file won't be
deleted concurrently
+ // by other threads while reading the contents.
+ return FileUtils.readAllBytes(getFile(jobId, key).toPath());
+ }
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java
index 6863219..9b10675 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java
@@ -31,13 +31,11 @@ import
org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.util.CompressedSerializedValue;
-import org.apache.flink.util.FileUtils;
import org.apache.flink.util.Preconditions;
import javax.annotation.Nonnegative;
import javax.annotation.Nullable;
-import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
@@ -128,12 +126,11 @@ public class InputGateDeploymentDescriptor implements
Serializable {
Preconditions.checkNotNull(blobService);
- final File dataFile = blobService.getFile(jobId, blobKey);
// NOTE: Do not delete the ShuffleDescriptor BLOBs since it may be
needed again during
// recovery. (it is deleted automatically on the BLOB server and
cache when its
// partition is no longer available or the job enters a terminal
state)
CompressedSerializedValue<ShuffleDescriptor[]> serializedValue =
-
CompressedSerializedValue.fromBytes(FileUtils.readAllBytes(dataFile.toPath()));
+
CompressedSerializedValue.fromBytes(blobService.readFile(jobId, blobKey));
serializedInputChannels = new NonOffloaded<>(serializedValue);
Preconditions.checkNotNull(serializedInputChannels);
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java
index d9a5699..c70c991 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.concurrent.FutureUtils;
@@ -236,7 +237,9 @@ public class BlobCacheCleanupTest extends TestLogger {
server = new BlobServer(config, new VoidBlobStore());
server.start();
InetSocketAddress serverAddress = new
InetSocketAddress("localhost", server.getPort());
- cache = new PermanentBlobCache(config, new VoidBlobStore(),
serverAddress);
+ final BlobCacheSizeTracker tracker =
+ new
BlobCacheSizeTracker(MemorySize.ofMebiBytes(100).getBytes());
+ cache = new PermanentBlobCache(config, new VoidBlobStore(),
serverAddress, tracker);
// upload blobs
keys.add(server.putPermanent(jobId, buf));
@@ -245,26 +248,29 @@ public class BlobCacheCleanupTest extends TestLogger {
checkFileCountForJob(2, jobId, server);
checkFileCountForJob(0, jobId, cache);
+ checkBlobCacheSizeTracker(tracker, jobId, 0);
// register once
cache.registerJob(jobId);
checkFileCountForJob(2, jobId, server);
checkFileCountForJob(0, jobId, cache);
+ checkBlobCacheSizeTracker(tracker, jobId, 0);
for (PermanentBlobKey key : keys) {
- cache.getFile(jobId, key);
+ cache.readFile(jobId, key);
}
// register again (let's say, from another thread or so)
cache.registerJob(jobId);
for (PermanentBlobKey key : keys) {
- cache.getFile(jobId, key);
+ cache.readFile(jobId, key);
}
assertEquals(2, checkFilesExist(jobId, keys, cache, true));
checkFileCountForJob(2, jobId, server);
checkFileCountForJob(2, jobId, cache);
+ checkBlobCacheSizeTracker(tracker, jobId, 2);
// after releasing once, nothing should change
cache.releaseJob(jobId);
@@ -272,6 +278,7 @@ public class BlobCacheCleanupTest extends TestLogger {
assertEquals(2, checkFilesExist(jobId, keys, cache, true));
checkFileCountForJob(2, jobId, server);
checkFileCountForJob(2, jobId, cache);
+ checkBlobCacheSizeTracker(tracker, jobId, 2);
// after releasing the second time, the job is up for deferred
cleanup
cache.releaseJob(jobId);
@@ -289,6 +296,7 @@ public class BlobCacheCleanupTest extends TestLogger {
// files are up for cleanup now...wait for it:
verifyJobCleanup(cache, jobId, keys);
+ checkBlobCacheSizeTracker(tracker, jobId, 0);
// server should be unaffected
checkFileCountForJob(2, jobId, server);
} finally {
@@ -447,4 +455,9 @@ public class BlobCacheCleanupTest extends TestLogger {
// this fails if we exited via a timeout
checkFileCountForJob(0, jobId, cache);
}
+
+ private static void checkBlobCacheSizeTracker(
+ BlobCacheSizeTracker tracker, JobID jobId, int expected) {
+ assertEquals(tracker.getBlobKeysByJobId(jobId).size(), expected);
+ }
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSizeTrackerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSizeTrackerTest.java
new file mode 100644
index 0000000..f8d6424
--- /dev/null
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSizeTrackerTest.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.blob.BlobKey.BlobType;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/** Tests for {@link BlobCacheSizeTracker}. */
+public class BlobCacheSizeTrackerTest extends TestLogger {
+
+ private BlobCacheSizeTracker tracker;
+ private JobID jobId;
+ private BlobKey blobKey;
+
+ @Before
+ public void setup() {
+ tracker = new BlobCacheSizeTracker(5L);
+ jobId = new JobID();
+ blobKey = BlobKey.createKey(BlobType.PERMANENT_BLOB);
+
+ tracker.track(jobId, blobKey, 3L);
+ }
+
+ @Test
+ public void testCheckLimit() {
+ List<Tuple2<JobID, BlobKey>> keys = tracker.checkLimit(3L);
+
+ assertEquals(1, keys.size());
+ assertEquals(jobId, keys.get(0).f0);
+ assertEquals(blobKey, keys.get(0).f1);
+ }
+
+ /** If an empty BLOB is intended to be stored, no BLOBs should be removed.
*/
+ @Test
+ public void testCheckLimitForEmptyBlob() {
+ List<Tuple2<JobID, BlobKey>> keys = tracker.checkLimit(0L);
+
+ assertEquals(0, keys.size());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testCheckLimitForBlobWithNegativeSize() {
+ tracker.checkLimit(-1L);
+ }
+
+ @Test
+ public void testTrack() {
+ assertEquals(3L, (long) tracker.getSize(jobId, blobKey));
+ assertTrue(tracker.getBlobKeysByJobId(jobId).contains(blobKey));
+ }
+
+ /**
+ * When trying to track a duplicated BLOB, the new one will be ignored,
just like {@link
+ * BlobUtils#moveTempFileToStore} does.
+ */
+ @Test
+ public void testTrackDuplicatedBlob() {
+ tracker.track(jobId, blobKey, 1L);
+ assertEquals(3L, (long) tracker.getSize(jobId, blobKey));
+ assertEquals(1, tracker.getBlobKeysByJobId(jobId).size());
+ }
+
+ @Test
+ public void testUntrack() {
+ assertEquals(1, tracker.checkLimit(3L).size());
+ tracker.untrack(Tuple2.of(jobId, blobKey));
+
+ assertNull(tracker.getSize(jobId, blobKey));
+ assertEquals(0, tracker.getBlobKeysByJobId(jobId).size());
+ assertEquals(0, tracker.checkLimit(3L).size());
+ }
+
+ /** Untracking a non-existing BLOB shouldn't change anything or throw any
exceptions. */
+ @Test
+ public void testUntrackNonExistingBlob() {
+ tracker.untrack(Tuple2.of(jobId,
BlobKey.createKey(BlobType.PERMANENT_BLOB)));
+ assertEquals(1, tracker.getBlobKeysByJobId(jobId).size());
+ }
+
+ /**
+ * Since the BlobCacheSizeLimitTracker only works in {@link
PermanentBlobCache}, the JobID
+ * shouldn't be null.
+ */
+ @Test(expected = NullPointerException.class)
+ public void testUntrackBlobWithNullJobId() {
+ tracker.untrack(Tuple2.of(null,
BlobKey.createKey(BlobType.PERMANENT_BLOB)));
+ }
+
+ @Test
+ public void testUpdate() {
+ BlobCacheSizeTracker tracker = new BlobCacheSizeTracker(5L);
+ List<JobID> jobIds = new ArrayList<>();
+ List<BlobKey> blobKeys = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ jobIds.add(new JobID());
+ blobKeys.add(BlobKey.createKey(BlobType.PERMANENT_BLOB));
+ }
+ for (int i = 0; i < 5; i++) {
+ tracker.track(jobIds.get(i), blobKeys.get(i), 1);
+ }
+ tracker.update(jobIds.get(1), blobKeys.get(1));
+ tracker.update(jobIds.get(2), blobKeys.get(2));
+
+ List<Tuple2<JobID, BlobKey>> blobsToDelete = tracker.checkLimit(2);
+
+ assertThat(
+ blobsToDelete,
+ containsInAnyOrder(
+ Tuple2.of(jobIds.get(0), blobKeys.get(0)),
+ Tuple2.of(jobIds.get(3), blobKeys.get(3))));
+ }
+
+ /**
+ * Updating the status for a non-existing BLOB shouldn't change anything
or throw any
+ * exceptions.
+ */
+ @Test
+ public void testUpdateNonExistingBlob() {
+ tracker.track(new JobID(), BlobKey.createKey(BlobType.PERMANENT_BLOB),
2L);
+ assertEquals(1, tracker.checkLimit(3L).size());
+
+ tracker.update(new JobID(),
BlobKey.createKey(BlobType.PERMANENT_BLOB));
+ assertEquals(1, tracker.checkLimit(3L).size());
+ }
+
+ @Test
+ public void testUntrackAll() {
+ tracker.track(jobId, BlobKey.createKey(BlobType.PERMANENT_BLOB), 1L);
+
+ JobID anotherJobId = new JobID();
+ tracker.track(anotherJobId,
BlobKey.createKey(BlobType.PERMANENT_BLOB), 1L);
+
+ assertEquals(2, tracker.getBlobKeysByJobId(jobId).size());
+ tracker.untrackAll(jobId);
+
+ assertEquals(0, tracker.getBlobKeysByJobId(jobId).size());
+ assertEquals(1, tracker.getBlobKeysByJobId(anotherJobId).size());
+ }
+
+ /**
+ * Untracking all BLOBs for a non-existing job shouldn't change anything
or throw any
+ * exceptions.
+ */
+ @Test
+ public void testUntrackAllWithNonExistingJob() {
+ tracker.track(jobId, BlobKey.createKey(BlobType.PERMANENT_BLOB), 1L);
+
+ assertEquals(2, tracker.getBlobKeysByJobId(jobId).size());
+ tracker.untrackAll(new JobID());
+
+ assertEquals(2, tracker.getBlobKeysByJobId(jobId).size());
+ }
+}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/PermanentBlobCacheSizeLimitTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/PermanentBlobCacheSizeLimitTest.java
new file mode 100644
index 0000000..a9272f5
--- /dev/null
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/PermanentBlobCacheSizeLimitTest.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.apache.flink.runtime.blob.BlobServerPutTest.put;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test for using {@link BlobCacheSizeTracker} to track the size of BLOBs in
{@link
+ * PermanentBlobCache}. When new BLOBs are intended to be stored and the size
limit exceeds, {@link
+ * BlobCacheSizeTracker} will provide excess BLOBs for {@link
PermanentBlobCache} to delete.
+ */
+public class PermanentBlobCacheSizeLimitTest {
+
+ private static final Random RANDOM = new Random();
+
+ private static final BlobKey.BlobType BLOB_TYPE =
BlobKey.BlobType.PERMANENT_BLOB;
+ private static final int BLOB_SIZE = 10_000;
+ // The size limit is the size of 2 blobs
+ private static final int MAX_NUM_OF_ACCEPTED_BLOBS = 2;
+ private static final int TOTAL_NUM_OF_BLOBS = 3;
+
+ @Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ private Configuration config;
+
+ @Before
+ public void setup() throws IOException {
+ config = new Configuration();
+ config.setString(
+ BlobServerOptions.STORAGE_DIRECTORY,
temporaryFolder.newFolder().getAbsolutePath());
+ }
+
+ @Test
+ public void testTrackSizeLimitAndDeleteExcessSequentially() throws
Exception {
+
+ try (BlobServer server = new BlobServer(config, new VoidBlobStore());
+ BlobCacheService cache =
+ initBlobCacheServiceWithSizeLimit(
+ config, new InetSocketAddress("localhost",
server.getPort()))) {
+
+ server.start();
+
+ // Put the BLOBs into the blob server
+ final BlobInfo[] blobs = putBlobsIntoBlobServer(server);
+
+ // The cache retrieves the BLOBs from the server sequentially
+ for (int i = 0; i < TOTAL_NUM_OF_BLOBS; i++) {
+
+ // Retrieve the BLOB from the blob server
+ readFileAndVerifyContent(cache, blobs[i].jobId,
blobs[i].blobKey, blobs[i].data);
+
+ // Retrieve the location of BLOBs from the blob cache
+ blobs[i].blobFile = getFile(cache, blobs[i].jobId,
blobs[i].blobKey);
+ assertTrue(blobs[i].blobFile.exists());
+ }
+
+ // Since the size limit of the blob cache is the size of 2 BLOBs,
+ // the first BLOB is removed and the second BLOB remains
+ assertFalse(blobs[0].blobFile.exists());
+ assertTrue(blobs[1].blobFile.exists());
+
+ // Retrieve the second BLOB once again,
+ // make the third BLOB to be the least recently used
+ readFileAndVerifyContent(cache, blobs[1].jobId, blobs[1].blobKey,
blobs[1].data);
+
+ // Then retrieve the first BLOB again, make sure the third BLOB is
replaced
+ blobs[0].blobKey = put(server, blobs[0].jobId, blobs[0].data,
BLOB_TYPE);
+ readFileAndVerifyContent(cache, blobs[0].jobId, blobs[0].blobKey,
blobs[0].data);
+ blobs[0].blobFile = getFile(cache, blobs[0].jobId,
blobs[0].blobKey);
+
+ assertTrue(blobs[0].blobFile.exists());
+ assertTrue(blobs[1].blobFile.exists());
+ assertFalse(blobs[2].blobFile.exists());
+ }
+ }
+
+ @Test
+ public void testTrackSizeLimitAndDeleteExcessConcurrently() throws
Exception {
+
+ final ExecutorService executor =
Executors.newFixedThreadPool(TOTAL_NUM_OF_BLOBS);
+
+ try (BlobServer server = new BlobServer(config, new VoidBlobStore());
+ BlobCacheService cache =
+ initBlobCacheServiceWithSizeLimit(
+ config, new InetSocketAddress("localhost",
server.getPort()))) {
+
+ server.start();
+
+ // Put the BLOBs into the blob server
+ final BlobInfo[] blobs = putBlobsIntoBlobServer(server);
+
+ final List<CompletableFuture<Void>> futures = new
ArrayList<>(TOTAL_NUM_OF_BLOBS);
+
+ // The blob cache retrieves the BLOB from the server concurrently
+ for (int i = 0; i < TOTAL_NUM_OF_BLOBS; i++) {
+ int idx = i;
+ CompletableFuture<Void> future =
+ CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ // Retrieve the BLOB from the blob
server
+ readFileAndVerifyContent(
+ cache,
+ blobs[idx].jobId,
+ blobs[idx].blobKey,
+ blobs[idx].data);
+
+ // Retrieve the location of BLOBs from
the blob cache
+ blobs[idx].blobFile =
+ getFile(
+ cache,
+ blobs[idx].jobId,
+ blobs[idx].blobKey);
+
+ return null;
+ } catch (IOException e) {
+ throw new CompletionException(e);
+ }
+ },
+ executor);
+
+ futures.add(future);
+ }
+
+ final CompletableFuture<Void> conjunctFuture =
FutureUtils.waitForAll(futures);
+ conjunctFuture.get();
+
+ // Check how many BLOBs exist in the blob cache
+ int exists = 0, nonExists = 0;
+ for (int i = 0; i < TOTAL_NUM_OF_BLOBS; i++) {
+ if (blobs[i].blobFile.exists()) {
+ exists++;
+ } else {
+ nonExists++;
+ }
+ }
+ assertEquals(MAX_NUM_OF_ACCEPTED_BLOBS, exists);
+ assertEquals(TOTAL_NUM_OF_BLOBS - MAX_NUM_OF_ACCEPTED_BLOBS,
nonExists);
+
+ } finally {
+ executor.shutdownNow();
+ }
+ }
+
+ /**
+ * {@link BlobInfo} contains all the information related to a BLOB (for
the test purpose only).
+ */
+ private static class BlobInfo {
+ private final JobID jobId;
+ private final byte[] data;
+ private BlobKey blobKey;
+ private File blobFile;
+
+ private BlobInfo() {
+ this.jobId = new JobID();
+
+ this.data = new byte[BLOB_SIZE];
+ RANDOM.nextBytes(this.data);
+ }
+ }
+
+ private static BlobInfo[] putBlobsIntoBlobServer(BlobServer server) throws
IOException {
+ // Initialize the information of BLOBs
+ BlobInfo[] blobs = new BlobInfo[TOTAL_NUM_OF_BLOBS];
+
+ // Put all the BLOBs into the blob server one by one
+ for (int i = 0; i < TOTAL_NUM_OF_BLOBS; i++) {
+ blobs[i] = new BlobInfo();
+
+ // Put the BLOB into the blob server
+ blobs[i].blobKey = put(server, blobs[i].jobId, blobs[i].data,
BLOB_TYPE);
+ assertNotNull(blobs[i].blobKey);
+ }
+
+ return blobs;
+ }
+
+ private static BlobCacheService initBlobCacheServiceWithSizeLimit(
+ Configuration config, @Nullable final InetSocketAddress
serverAddress)
+ throws IOException {
+
+ final PermanentBlobCache permanentBlobCache =
+ new PermanentBlobCache(
+ config,
+ new VoidBlobStore(),
+ serverAddress,
+ new BlobCacheSizeTracker(MAX_NUM_OF_ACCEPTED_BLOBS *
BLOB_SIZE));
+
+ final TransientBlobCache transientBlobCache = new
TransientBlobCache(config, serverAddress);
+
+ return new BlobCacheService(permanentBlobCache, transientBlobCache);
+ }
+
+ private static void readFileAndVerifyContent(
+ BlobService blobService, JobID jobId, BlobKey blobKey, byte[]
expected)
+ throws IOException {
+
+ assertNotNull(jobId);
+ assertNotNull(blobKey);
+ assertTrue(blobKey instanceof PermanentBlobKey);
+
+ byte[] target =
+ blobService.getPermanentBlobService().readFile(jobId,
(PermanentBlobKey) blobKey);
+ assertArrayEquals(expected, target);
+ }
+
+ private static File getFile(BlobCacheService blobCacheService, JobID
jobId, BlobKey blobKey)
+ throws IOException {
+ return
blobCacheService.getPermanentBlobService().getStorageLocation(jobId, blobKey);
+ }
+}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentWithSmallBlobCacheSizeLimitTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentWithSmallBlobCacheSizeLimitTest.java
new file mode 100644
index 0000000..4a37421
--- /dev/null
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentWithSmallBlobCacheSizeLimitTest.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.blob.BlobCacheSizeTracker;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.PermanentBlobCache;
+import org.apache.flink.runtime.blob.VoidBlobStore;
+import org.apache.flink.runtime.client.JobExecutionException;
+import
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
+import org.apache.flink.runtime.operators.BatchTask;
+import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
+import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+import org.apache.flink.util.function.FunctionUtils;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * Tests {@link ExecutionGraph} deployment when job and task information are
offloaded into the BLOB
+ * server and accessed from the {@link PermanentBlobCache}. {@link
PermanentBlobCache} uses {@link
+ * BlobCacheSizeTracker} to track the size of ShuffleDescriptor BLOBs. The
deployment works well
+ * even the size limit of {@link BlobCacheSizeTracker} in {@link
PermanentBlobCache} is set to the
+ * minimum value.
+ */
+public class DefaultExecutionGraphDeploymentWithSmallBlobCacheSizeLimitTest
+ extends DefaultExecutionGraphDeploymentWithBlobCacheTest {
+
+ @Before
+ @Override
+ public void setupBlobServer() throws IOException {
+ Configuration config = new Configuration();
+ // Always offload the serialized JobInformation, TaskInformation and
cached
+ // ShuffleDescriptors
+ config.setInteger(BlobServerOptions.OFFLOAD_MINSIZE, 0);
+ blobServer = new BlobServer(config, new VoidBlobStore());
+ blobServer.start();
+ blobWriter = blobServer;
+
+ InetSocketAddress serverAddress = new InetSocketAddress("localhost",
blobServer.getPort());
+ // Set the size limit of the blob cache to 1
+ BlobCacheSizeTracker blobCacheSizeTracker = new
BlobCacheSizeTracker(1L);
+ blobCache =
+ new PermanentBlobCache(
+ config, new VoidBlobStore(), serverAddress,
blobCacheSizeTracker);
+ }
+
+ /**
+ * Test the deployment works well even the size limit of {@link
BlobCacheSizeTracker} in {@link
+ * PermanentBlobCache} is set to the minimum value.
+ *
+ * <p>In this extreme case, since the size limit is 1, every time a task
is deployed, all the
+ * existing **tracked** BLOBs on the cache must be untracked and deleted
before the new BLOB is
+ * stored onto the cache.
+ *
+ * <p>This extreme case covers the situation of the normal case, where the
size limit is much
+ * larger than 1 and the deletion won't happen so frequently.
+ */
+ @Test
+ public void testDeployMultipleTasksWithSmallBlobCacheSizeLimit() throws
Exception {
+
+ final int numberOfVertices = 4;
+ final int parallelism = 10;
+
+ final ExecutionGraph eg =
createAndSetupExecutionGraph(numberOfVertices, parallelism);
+
+ final SimpleAckingTaskManagerGateway taskManagerGateway =
+ new SimpleAckingTaskManagerGateway();
+ final BlockingQueue<TaskDeploymentDescriptor> tdds =
+ new ArrayBlockingQueue<>(numberOfVertices * parallelism);
+ taskManagerGateway.setSubmitConsumer(
+ FunctionUtils.uncheckedConsumer(
+ taskDeploymentDescriptor -> {
+ taskDeploymentDescriptor.loadBigData(blobCache);
+ tdds.offer(taskDeploymentDescriptor);
+ }));
+
+ for (ExecutionJobVertex ejv : eg.getVerticesTopologically()) {
+ for (ExecutionVertex ev : ejv.getTaskVertices()) {
+
+ assertEquals(ExecutionState.CREATED, ev.getExecutionState());
+
+ LogicalSlot slot =
+ new TestingLogicalSlotBuilder()
+ .setTaskManagerGateway(taskManagerGateway)
+ .createTestingLogicalSlot();
+ ev.getCurrentExecutionAttempt()
+
.registerProducedPartitions(slot.getTaskManagerLocation(), true)
+ .get();
+ ev.deployToSlot(slot);
+ assertEquals(ExecutionState.DEPLOYING, ev.getExecutionState());
+
+ TaskDeploymentDescriptor tdd = tdds.take();
+ assertNotNull(tdd);
+
+ List<InputGateDeploymentDescriptor> igdds =
tdd.getInputGates();
+ assertEquals(ev.getAllConsumedPartitionGroups().size(),
igdds.size());
+
+ if (igdds.size() > 0) {
+ checkShuffleDescriptors(igdds.get(0),
ev.getConsumedPartitionGroup(0));
+ }
+ }
+ }
+ }
+
+ private ExecutionGraph createAndSetupExecutionGraph(int numberOfVertices,
int parallelism)
+ throws JobException, JobExecutionException {
+
+ final List<JobVertex> vertices = new ArrayList<>();
+
+ for (int i = 0; i < numberOfVertices; i++) {
+ JobVertex vertex = new JobVertex(String.format("v%d", i + 1), new
JobVertexID());
+ vertex.setParallelism(parallelism);
+ vertex.setInvokableClass(BatchTask.class);
+ vertices.add(vertex);
+ }
+
+ for (int i = 1; i < numberOfVertices; i++) {
+ vertices.get(i)
+ .connectNewDataSetAsInput(
+ vertices.get(i - 1),
+ DistributionPattern.POINTWISE,
+ ResultPartitionType.BLOCKING);
+ }
+
+ final JobGraph jobGraph =
+ JobGraphTestUtils.batchJobGraph(vertices.toArray(new
JobVertex[0]));
+
+ final DirectScheduledExecutorService executor = new
DirectScheduledExecutorService();
+ final DefaultExecutionGraph eg =
+ TestingDefaultExecutionGraphBuilder.newBuilder()
+ .setJobGraph(jobGraph)
+ .setFutureExecutor(executor)
+ .setIoExecutor(executor)
+ .setBlobWriter(blobWriter)
+ .build();
+
+ eg.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
+
+ return eg;
+ }
+
+ private static void checkShuffleDescriptors(
+ InputGateDeploymentDescriptor igdd, ConsumedPartitionGroup
consumedPartitionGroup) {
+ int idx = 0;
+ for (IntermediateResultPartitionID consumedPartitionId :
consumedPartitionGroup) {
+ assertEquals(
+ consumedPartitionId,
+
igdd.getShuffleDescriptors()[idx++].getResultPartitionID().getPartitionId());
+ }
+ }
+}