Repository: tez Updated Branches: refs/heads/master 5f953bfd9 -> 57c857d26
TEZ-1526. LoadingCache for TezTaskID slow for large jobs (jeagles) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/57c857d2 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/57c857d2 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/57c857d2 Branch: refs/heads/master Commit: 57c857d267f17dd4e47b53d7691996d73c4476a1 Parents: 5f953bf Author: Jonathan Eagles <[email protected]> Authored: Tue Mar 14 15:11:36 2017 -0500 Committer: Jonathan Eagles <[email protected]> Committed: Tue Mar 14 15:11:36 2017 -0500 ---------------------------------------------------------------------- .../org/apache/tez/dag/records/TezDAGID.java | 64 +++++++------------- .../java/org/apache/tez/dag/records/TezID.java | 21 +++++++ .../tez/dag/records/TezTaskAttemptID.java | 57 ++++++----------- .../org/apache/tez/dag/records/TezTaskID.java | 51 ++++++---------- .../org/apache/tez/dag/records/TezVertexID.java | 48 ++++++--------- ...tesianProductVertexManagerUnpartitioned.java | 6 +- 6 files changed, 103 insertions(+), 144 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/57c857d2/tez-common/src/main/java/org/apache/tez/dag/records/TezDAGID.java ---------------------------------------------------------------------- diff --git a/tez-common/src/main/java/org/apache/tez/dag/records/TezDAGID.java b/tez-common/src/main/java/org/apache/tez/dag/records/TezDAGID.java index b7a2c8f..2e3309e 100644 --- a/tez-common/src/main/java/org/apache/tez/dag/records/TezDAGID.java +++ b/tez-common/src/main/java/org/apache/tez/dag/records/TezDAGID.java @@ -21,15 +21,12 @@ package org.apache.tez.dag.records; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; -import java.text.NumberFormat; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.yarn.api.records.ApplicationId; import com.google.common.base.Preconditions; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; +import org.apache.tez.util.FastNumberFormat; /** * TezDAGID represents the immutable and unique identifier for @@ -43,16 +40,7 @@ import com.google.common.cache.LoadingCache; */ public class TezDAGID extends TezID { - private static LoadingCache<TezDAGID, TezDAGID> dagIdCache = CacheBuilder.newBuilder().softValues(). - build( - new CacheLoader<TezDAGID, TezDAGID>() { - @Override - public TezDAGID load(TezDAGID key) throws Exception { - return key; - } - } - ); - + private static TezIDCache<TezDAGID> tezDAGIDCache = new TezIDCache<>(); private ApplicationId applicationId; /** @@ -65,13 +53,12 @@ public class TezDAGID extends TezID { // will be short-lived. // Alternately the cache can be keyed by the hash of the incoming paramters. Preconditions.checkArgument(applicationId != null, "ApplicationID cannot be null"); - return dagIdCache.getUnchecked(new TezDAGID(applicationId, id)); + return tezDAGIDCache.getInstance(new TezDAGID(applicationId, id)); } @InterfaceAudience.Private public static void clearCache() { - dagIdCache.invalidateAll(); - dagIdCache.cleanUp(); + tezDAGIDCache.clear(); } /** @@ -85,7 +72,7 @@ public class TezDAGID extends TezID { // will be short-lived. // Alternately the cache can be keyed by the hash of the incoming paramters. Preconditions.checkArgument(yarnRMIdentifier != null, "yarnRMIdentifier cannot be null"); - return dagIdCache.getUnchecked(new TezDAGID(yarnRMIdentifier, appId, id)); + return tezDAGIDCache.getInstance(new TezDAGID(yarnRMIdentifier, appId, id)); } // Public for Writable serialization. Verify if this is actually required. @@ -151,25 +138,14 @@ public class TezDAGID extends TezID { // DO NOT CHANGE THIS. DAGClient replicates this code to create DAG id string public static final String DAG = "dag"; - static final ThreadLocal<NumberFormat> tezAppIdFormat = new ThreadLocal<NumberFormat>() { + static final ThreadLocal<FastNumberFormat> tezAppIdFormat = new ThreadLocal<FastNumberFormat>() { @Override - public NumberFormat initialValue() { - NumberFormat fmt = NumberFormat.getInstance(); - fmt.setGroupingUsed(false); + public FastNumberFormat initialValue() { + FastNumberFormat fmt = FastNumberFormat.getInstance(); fmt.setMinimumIntegerDigits(4); return fmt; } }; - - static final ThreadLocal<NumberFormat> tezDagIdFormat = new ThreadLocal<NumberFormat>() { - @Override - public NumberFormat initialValue() { - NumberFormat fmt = NumberFormat.getInstance(); - fmt.setGroupingUsed(false); - fmt.setMinimumIntegerDigits(1); - return fmt; - } - }; @Override public String toString() { @@ -190,10 +166,15 @@ public class TezDAGID extends TezID { throw new IllegalArgumentException("numDagsPerGroup has to be more than one. Got: " + numDagsPerGroup); } - return DAG_GROUPID_PREFIX + SEPARATOR + - getApplicationId().getClusterTimestamp() + SEPARATOR + - tezAppIdFormat.get().format(getApplicationId().getId()) + SEPARATOR + - tezDagIdFormat.get().format((getId() - 1) / numDagsPerGroup); + StringBuilder sb = new StringBuilder(); + sb.append(DAG_GROUPID_PREFIX); + sb.append(SEPARATOR); + sb.append(getApplicationId().getClusterTimestamp()); + sb.append(SEPARATOR); + tezAppIdFormat.get().format(getApplicationId().getId(), sb); + sb.append(SEPARATOR); + sb.append((id - 1) / numDagsPerGroup); + return sb.toString(); } public static TezDAGID fromString(String dagId) { @@ -225,12 +206,11 @@ public class TezDAGID extends TezID { * @return the builder that was passed in */ protected StringBuilder appendTo(StringBuilder builder) { - return builder.append(SEPARATOR). - append(applicationId.getClusterTimestamp()). - append(SEPARATOR). - append(tezAppIdFormat.get().format(applicationId.getId())). - append(SEPARATOR). - append(tezDagIdFormat.get().format(id)); + builder.append(SEPARATOR); + builder.append(applicationId.getClusterTimestamp()); + builder.append(SEPARATOR); + tezAppIdFormat.get().format(applicationId.getId(), builder); + return builder.append(SEPARATOR).append(id); } @Override http://git-wip-us.apache.org/repos/asf/tez/blob/57c857d2/tez-common/src/main/java/org/apache/tez/dag/records/TezID.java ---------------------------------------------------------------------- diff --git a/tez-common/src/main/java/org/apache/tez/dag/records/TezID.java b/tez-common/src/main/java/org/apache/tez/dag/records/TezID.java index 7efbd9a..cd7b27d 100644 --- a/tez-common/src/main/java/org/apache/tez/dag/records/TezID.java +++ b/tez-common/src/main/java/org/apache/tez/dag/records/TezID.java @@ -21,6 +21,8 @@ package org.apache.tez.dag.records; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.lang.ref.WeakReference; +import java.util.WeakHashMap; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -40,6 +42,25 @@ public abstract class TezID implements WritableComparable<TezID> { public static final char SEPARATOR = '_'; protected int id; + public static class TezIDCache<T> { + private final WeakHashMap<T, WeakReference<T>> cache = new WeakHashMap<>(); + + synchronized T getInstance(final T id) { + final WeakReference<T> cached = cache.get(id); + if (cached != null) { + final T value = cached.get(); + if (value != null) + return value; + } + cache.put(id, new WeakReference<T>(id)); + return id; + } + + synchronized void clear() { + cache.clear(); + } + } + /** constructs an ID object from the given int */ public TezID(int id) { this.id = id; http://git-wip-us.apache.org/repos/asf/tez/blob/57c857d2/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskAttemptID.java ---------------------------------------------------------------------- diff --git a/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskAttemptID.java b/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskAttemptID.java index 296d577..7aee80f 100644 --- a/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskAttemptID.java +++ b/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskAttemptID.java @@ -21,15 +21,10 @@ package org.apache.tez.dag.records; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; -import java.text.NumberFormat; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; - /** * TezTaskAttemptID represents the immutable and unique identifier for * a task attempt. Each task attempt is one particular instance of a Tez Task @@ -50,17 +45,9 @@ import com.google.common.cache.LoadingCache; public class TezTaskAttemptID extends TezID { public static final String ATTEMPT = "attempt"; private TezTaskID taskId; - - private static LoadingCache<TezTaskAttemptID, TezTaskAttemptID> taskAttemptIDCache = CacheBuilder.newBuilder().softValues(). - build( - new CacheLoader<TezTaskAttemptID, TezTaskAttemptID>() { - @Override - public TezTaskAttemptID load(TezTaskAttemptID key) throws Exception { - return key; - } - } - ); - + + private static TezIDCache<TezTaskAttemptID> tezTaskAttemptIDCache = new TezIDCache<>(); + // Public for Writable serialization. Verify if this is actually required. public TezTaskAttemptID() { } @@ -71,13 +58,12 @@ public class TezTaskAttemptID extends TezID { * @param id the task attempt number */ public static TezTaskAttemptID getInstance(TezTaskID taskID, int id) { - return taskAttemptIDCache.getUnchecked(new TezTaskAttemptID(taskID, id)); + return tezTaskAttemptIDCache.getInstance(new TezTaskAttemptID(taskID, id)); } @InterfaceAudience.Private public static void clearCache() { - taskAttemptIDCache.invalidateAll(); - taskAttemptIDCache.cleanUp(); + tezTaskAttemptIDCache.clear(); } private TezTaskAttemptID(TezTaskID taskId, int id) { @@ -108,7 +94,9 @@ public class TezTaskAttemptID extends TezID { * @return the builder that was passed in. */ protected StringBuilder appendTo(StringBuilder builder) { - return taskId.appendTo(builder).append(SEPARATOR).append(id); + taskId.appendTo(builder); + builder.append(SEPARATOR); + return builder.append(id); } @Override @@ -151,25 +139,20 @@ public class TezTaskAttemptID extends TezID { super.write(out); } - protected static final ThreadLocal<NumberFormat> tezTaskAttemptIdFormat = new ThreadLocal<NumberFormat>() { - @Override - public NumberFormat initialValue() { - NumberFormat fmt = NumberFormat.getInstance(); - fmt.setGroupingUsed(false); - fmt.setMinimumIntegerDigits(1); - return fmt; - } - }; - public static TezTaskAttemptID fromString(String taIdStr) { try { - String[] split = taIdStr.split("_"); - String rmId = split[1]; - int appId = TezDAGID.tezAppIdFormat.get().parse(split[2]).intValue(); - int dagId = TezDAGID.tezDagIdFormat.get().parse(split[3]).intValue(); - int vId = TezVertexID.tezVertexIdFormat.get().parse(split[4]).intValue(); - int taskId = TezTaskID.tezTaskIdFormat.get().parse(split[5]).intValue(); - int id = tezTaskAttemptIdFormat.get().parse(split[6]).intValue(); + int pos1 = taIdStr.indexOf(SEPARATOR); + int pos2 = taIdStr.indexOf(SEPARATOR, pos1 + 1); + int pos3 = taIdStr.indexOf(SEPARATOR, pos2 + 1); + int pos4 = taIdStr.indexOf(SEPARATOR, pos3 + 1); + int pos5 = taIdStr.indexOf(SEPARATOR, pos4 + 1); + int pos6 = taIdStr.indexOf(SEPARATOR, pos5 + 1); + String rmId = taIdStr.substring(pos1 + 1, pos2); + int appId = Integer.parseInt(taIdStr.substring(pos2 + 1, pos3)); + int dagId = Integer.parseInt(taIdStr.substring(pos3 + 1, pos4)); + int vId = Integer.parseInt(taIdStr.substring(pos4 + 1, pos5)); + int taskId = Integer.parseInt(taIdStr.substring(pos5 + 1, pos6)); + int id = Integer.parseInt(taIdStr.substring(pos6 + 1)); return TezTaskAttemptID.getInstance( TezTaskID.getInstance( http://git-wip-us.apache.org/repos/asf/tez/blob/57c857d2/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskID.java ---------------------------------------------------------------------- diff --git a/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskID.java b/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskID.java index 3d28348..3295f6a 100644 --- a/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskID.java +++ b/tez-common/src/main/java/org/apache/tez/dag/records/TezTaskID.java @@ -21,16 +21,12 @@ package org.apache.tez.dag.records; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; -import java.text.NumberFormat; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import com.google.common.base.Preconditions; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; - +import org.apache.tez.util.FastNumberFormat; /** * TaskID represents the immutable and unique identifier for @@ -46,26 +42,16 @@ public class TezTaskID extends TezID { public static final String TASK = "task"; private final int serializingHash; - static final ThreadLocal<NumberFormat> tezTaskIdFormat = new ThreadLocal<NumberFormat>() { + static final ThreadLocal<FastNumberFormat> tezTaskIdFormat = new ThreadLocal<FastNumberFormat>() { @Override - public NumberFormat initialValue() { - NumberFormat fmt = NumberFormat.getInstance(); - fmt.setGroupingUsed(false); + public FastNumberFormat initialValue() { + FastNumberFormat fmt = FastNumberFormat.getInstance(); fmt.setMinimumIntegerDigits(6); return fmt; } }; - private static LoadingCache<TezTaskID, TezTaskID> taskIDCache = CacheBuilder.newBuilder().softValues(). - build( - new CacheLoader<TezTaskID, TezTaskID>() { - @Override - public TezTaskID load(TezTaskID key) throws Exception { - return key; - } - } - ); - + private static TezIDCache<TezTaskID> tezTaskIDCache = new TezIDCache<>(); private TezVertexID vertexId; /** @@ -75,13 +61,12 @@ public class TezTaskID extends TezID { */ public static TezTaskID getInstance(TezVertexID vertexID, int id) { Preconditions.checkArgument(vertexID != null, "vertexID cannot be null"); - return taskIDCache.getUnchecked(new TezTaskID(vertexID, id)); + return tezTaskIDCache.getInstance(new TezTaskID(vertexID, id)); } @InterfaceAudience.Private public static void clearCache() { - taskIDCache.invalidateAll(); - taskIDCache.cleanUp(); + tezTaskIDCache.clear(); } private TezTaskID(TezVertexID vertexID, int id) { @@ -130,9 +115,9 @@ public class TezTaskID extends TezID { * @return the builder that was passed in */ protected StringBuilder appendTo(StringBuilder builder) { - return vertexId.appendTo(builder). - append(SEPARATOR). - append(tezTaskIdFormat.get().format(id)); + vertexId.appendTo(builder); + builder.append(SEPARATOR); + return tezTaskIdFormat.get().format(id, builder); } @Override @@ -170,12 +155,16 @@ public class TezTaskID extends TezID { public static TezTaskID fromString(String taskIdStr) { try { - String[] split = taskIdStr.split("_"); - String rmId = split[1]; - int appId = TezDAGID.tezAppIdFormat.get().parse(split[2]).intValue(); - int dagId = TezDAGID.tezDagIdFormat.get().parse(split[3]).intValue(); - int vId = TezVertexID.tezVertexIdFormat.get().parse(split[4]).intValue(); - int id = tezTaskIdFormat.get().parse(split[5]).intValue(); + int pos1 = taskIdStr.indexOf(SEPARATOR); + int pos2 = taskIdStr.indexOf(SEPARATOR, pos1 + 1); + int pos3 = taskIdStr.indexOf(SEPARATOR, pos2 + 1); + int pos4 = taskIdStr.indexOf(SEPARATOR, pos3 + 1); + int pos5 = taskIdStr.indexOf(SEPARATOR, pos4 + 1); + String rmId = taskIdStr.substring(pos1 + 1, pos2); + int appId = Integer.parseInt(taskIdStr.substring(pos2 + 1, pos3)); + int dagId = Integer.parseInt(taskIdStr.substring(pos3 + 1, pos4)); + int vId = Integer.parseInt(taskIdStr.substring(pos4 + 1, pos5)); + int id = Integer.parseInt(taskIdStr.substring(pos5 + 1)); return TezTaskID.getInstance( TezVertexID.getInstance( http://git-wip-us.apache.org/repos/asf/tez/blob/57c857d2/tez-common/src/main/java/org/apache/tez/dag/records/TezVertexID.java ---------------------------------------------------------------------- diff --git a/tez-common/src/main/java/org/apache/tez/dag/records/TezVertexID.java b/tez-common/src/main/java/org/apache/tez/dag/records/TezVertexID.java index d30df16..b56c9ad 100644 --- a/tez-common/src/main/java/org/apache/tez/dag/records/TezVertexID.java +++ b/tez-common/src/main/java/org/apache/tez/dag/records/TezVertexID.java @@ -21,15 +21,12 @@ package org.apache.tez.dag.records; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; -import java.text.NumberFormat; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import com.google.common.base.Preconditions; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; +import org.apache.tez.util.FastNumberFormat; /** * TezVertexID represents the immutable and unique identifier for @@ -46,27 +43,17 @@ import com.google.common.cache.LoadingCache; @InterfaceStability.Stable public class TezVertexID extends TezID { public static final String VERTEX = "vertex"; - static final ThreadLocal<NumberFormat> tezVertexIdFormat = new ThreadLocal<NumberFormat>() { + static final ThreadLocal<FastNumberFormat> tezVertexIdFormat = new ThreadLocal<FastNumberFormat>() { @Override - public NumberFormat initialValue() { - NumberFormat fmt = NumberFormat.getInstance(); - fmt.setGroupingUsed(false); + public FastNumberFormat initialValue() { + FastNumberFormat fmt = FastNumberFormat.getInstance(); fmt.setMinimumIntegerDigits(2); return fmt; } }; - private static LoadingCache<TezVertexID, TezVertexID> vertexIDCache = CacheBuilder.newBuilder().softValues(). - build( - new CacheLoader<TezVertexID, TezVertexID>() { - @Override - public TezVertexID load(TezVertexID key) throws Exception { - return key; - } - } - ); - + private static TezIDCache<TezVertexID> tezVertexIDCache = new TezIDCache<>(); private TezDAGID dagId; // Public for Writable serialization. Verify if this is actually required. @@ -80,13 +67,12 @@ public class TezVertexID extends TezID { */ public static TezVertexID getInstance(TezDAGID dagId, int id) { Preconditions.checkArgument(dagId != null, "DagID cannot be null"); - return vertexIDCache.getUnchecked(new TezVertexID(dagId, id)); + return tezVertexIDCache.getInstance(new TezVertexID(dagId, id)); } @InterfaceAudience.Private public static void clearCache() { - vertexIDCache.invalidateAll(); - vertexIDCache.cleanUp(); + tezVertexIDCache.clear(); } private TezVertexID(TezDAGID dagId, int id) { @@ -146,9 +132,9 @@ public class TezVertexID extends TezID { * @return the builder that was passed in */ protected StringBuilder appendTo(StringBuilder builder) { - return dagId.appendTo(builder). - append(SEPARATOR). - append(tezVertexIdFormat.get().format(id)); + dagId.appendTo(builder); + builder.append(SEPARATOR); + return tezVertexIdFormat.get().format(id, builder); } @Override @@ -158,12 +144,14 @@ public class TezVertexID extends TezID { public static TezVertexID fromString(String vertexIdStr) { try { - String[] split = vertexIdStr.split("_"); - String rmId = split[1]; - int appId = TezDAGID.tezAppIdFormat.get().parse(split[2]).intValue(); - int dagId = TezDAGID.tezDagIdFormat.get().parse(split[3]).intValue(); - int id = tezVertexIdFormat.get().parse(split[4]).intValue(); - + int pos1 = vertexIdStr.indexOf(SEPARATOR); + int pos2 = vertexIdStr.indexOf(SEPARATOR, pos1 + 1); + int pos3 = vertexIdStr.indexOf(SEPARATOR, pos2 + 1); + int pos4 = vertexIdStr.indexOf(SEPARATOR, pos3 + 1); + String rmId = vertexIdStr.substring(pos1 + 1, pos2); + int appId = Integer.parseInt(vertexIdStr.substring(pos2 + 1, pos3)); + int dagId = Integer.parseInt(vertexIdStr.substring(pos3 + 1, pos4)); + int id = Integer.parseInt(vertexIdStr.substring(pos4 + 1)); return TezVertexID.getInstance( TezDAGID.getInstance(rmId, appId, dagId), id); http://git-wip-us.apache.org/repos/asf/tez/blob/57c857d2/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManagerUnpartitioned.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManagerUnpartitioned.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManagerUnpartitioned.java index 31a3941..d2ce378 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManagerUnpartitioned.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManagerUnpartitioned.java @@ -290,11 +290,10 @@ public class TestCartesianProductVertexManagerUnpartitioned { VertexManagerEvent vmEvent = VertexManagerEvent.create("cp vertex", proto.toByteString().asReadOnlyByteBuffer()); - Formatter formatter = new Formatter(); for (int i = 0; i < desiredBytesPerGroup/outputBytesPerTaskV0; i++) { vmEvent.setProducerAttemptIdentifier( new TaskAttemptIdentifierImpl("dag", "v0", TezTaskAttemptID.fromString( - formatter.format("attempt_1441301219877_0109_1_00_%06d_0", i).toString()))); + String.format("attempt_1441301219877_0109_1_00_%06d_0", i)))); vertexManager.onVertexManagerEventReceived(vmEvent); } verify(context, never()).reconfigureVertex(anyInt(), any(VertexLocationHint.class), @@ -313,10 +312,9 @@ public class TestCartesianProductVertexManagerUnpartitioned { anyMapOf(String.class, EdgeProperty.class)); vmEvent.setProducerAttemptIdentifier( new TaskAttemptIdentifierImpl("dag", "v1", TezTaskAttemptID.fromString( - formatter.format("attempt_1441301219877_0109_1_01_%06d_0", i).toString()))); + String.format("attempt_1441301219877_0109_1_01_%06d_0", i)))); vertexManager.onVertexManagerEventReceived(vmEvent); } - formatter.close(); verify(context, times(1)).reconfigureVertex(parallelismCaptor.capture(), isNull(VertexLocationHint.class), edgePropertiesCaptor.capture()); Map<String, EdgeProperty> edgeProperties = edgePropertiesCaptor.getValue();
