johnyangk closed pull request #68: [NEMO-125] Fix data loss bug caused by
SailfishSchedulingPolicy
URL: https://github.com/apache/incubator-nemo/pull/68
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/SailfishPass.java
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/SailfishPass.java
index ab1ece120..d5c50e4bf 100644
---
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/SailfishPass.java
+++
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/SailfishPass.java
@@ -32,9 +32,8 @@ public SailfishPass() {
new SailfishRelayReshapingPass(),
new SailfishEdgeDataFlowModelPass(),
new SailfishEdgeDataStorePass(),
- // TODO #125: Fix data loss bug caused by SailfishSchedulingPolicy
- // new SailfishEdgeDecoderPass(),
- // new SailfishEdgeEncoderPass(),
+ new SailfishEdgeDecoderPass(),
+ new SailfishEdgeEncoderPass(),
new SailfishEdgeUsedDataHandlingPass()
));
}
diff --git
a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/WordCountITCase.java
b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/WordCountITCase.java
index 507d36542..0ffba2ee5 100644
---
a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/WordCountITCase.java
+++
b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/WordCountITCase.java
@@ -40,13 +40,13 @@
private static final String outputFileName = "sample_output_wordcount";
private static final String testResourceFileName = "test_output_wordcount";
private static final String executorResourceFileName = fileBasePath +
"beam_sample_executor_resources.json";
+ private static final String oneExecutorResourceFileName = fileBasePath +
"beam_sample_one_executor_resources.json";
private static final String inputFilePath = fileBasePath + inputFileName;
private static final String outputFilePath = fileBasePath + outputFileName;
@Before
public void setUp() throws Exception {
builder = new ArgBuilder()
- .addResourceJson(executorResourceFileName)
.addUserMain(WordCount.class.getCanonicalName())
.addUserArgs(inputFilePath, outputFilePath);
}
@@ -63,6 +63,7 @@ public void tearDown() throws Exception {
@Test (timeout = TIMEOUT)
public void test() throws Exception {
JobLauncher.main(builder
+ .addResourceJson(executorResourceFileName)
.addJobId(WordCountITCase.class.getSimpleName())
.addOptimizationPolicy(DefaultPolicyParallelismFive.class.getCanonicalName())
.build());
@@ -71,14 +72,25 @@ public void test() throws Exception {
@Test (timeout = TIMEOUT)
public void testSailfish() throws Exception {
JobLauncher.main(builder
+ .addResourceJson(executorResourceFileName)
.addJobId(WordCountITCase.class.getSimpleName() + "_sailfish")
.addOptimizationPolicy(SailfishPolicyParallelismFive.class.getCanonicalName())
.build());
}
+ @Test (timeout = TIMEOUT)
+ public void testSailfishInOneExecutor() throws Exception {
+ JobLauncher.main(builder
+ .addResourceJson(oneExecutorResourceFileName)
+ .addJobId(WordCountITCase.class.getSimpleName() +
"_sailfishInOneExecutor")
+
.addOptimizationPolicy(SailfishPolicyParallelismFive.class.getCanonicalName())
+ .build());
+ }
+
@Test (timeout = TIMEOUT)
public void testPado() throws Exception {
JobLauncher.main(builder
+ .addResourceJson(executorResourceFileName)
.addJobId(WordCountITCase.class.getSimpleName() + "_pado")
.addOptimizationPolicy(PadoPolicyParallelismFive.class.getCanonicalName())
.build());
diff --git a/examples/resources/beam_sample_one_executor_resources.json
b/examples/resources/beam_sample_one_executor_resources.json
new file mode 100644
index 000000000..069ed973d
--- /dev/null
+++ b/examples/resources/beam_sample_one_executor_resources.json
@@ -0,0 +1,7 @@
+[
+ {
+ "type": "Transient",
+ "memory_mb": 512,
+ "capacity": 5
+ }
+]
diff --git
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/DataUtil.java
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/DataUtil.java
index 69eb9bab1..50c43d930 100644
---
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/DataUtil.java
+++
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/DataUtil.java
@@ -49,44 +49,39 @@ private DataUtil() {
/**
* Serializes the elements in a non-serialized partition into an output
stream.
*
- * @param encoderFactory the encoderFactory to encode the
elements.
+ * @param encoderFactory the encoderFactory to encode the elements.
* @param nonSerializedPartition the non-serialized partition to serialize.
* @param bytesOutputStream the output stream to write.
- * @return total number of elements in the partition.
* @throws IOException if fail to serialize.
*/
- public static long serializePartition(final EncoderFactory encoderFactory,
- final NonSerializedPartition
nonSerializedPartition,
- final OutputStream bytesOutputStream)
throws IOException {
- long elementsCount = 0;
+ private static void serializePartition(final EncoderFactory encoderFactory,
+ final NonSerializedPartition
nonSerializedPartition,
+ final OutputStream bytesOutputStream)
throws IOException {
final EncoderFactory.Encoder encoder =
encoderFactory.create(bytesOutputStream);
for (final Object element : nonSerializedPartition.getData()) {
encoder.encode(element);
- elementsCount++;
}
-
- return elementsCount;
}
/**
* Reads the data of a partition from an input stream and deserializes it.
*
- * @param elementsInPartition the number of elements in this partition.
- * @param serializer the serializer to decode the bytes.
- * @param key the key value of the result partition.
- * @param inputStream the input stream which will return the data in
the partition as bytes.
- * @param <K> the key type of the partitions.
+ * @param partitionSize the size of the partition to deserialize.
+ * @param serializer the serializer to decode the bytes.
+ * @param key the key value of the result partition.
+ * @param inputStream the input stream which will return the data in the
partition as bytes.
+ * @param <K> the key type of the partitions.
* @return the list of deserialized elements.
* @throws IOException if fail to deserialize.
*/
- public static <K extends Serializable> NonSerializedPartition
deserializePartition(final long elementsInPartition,
+ public static <K extends Serializable> NonSerializedPartition
deserializePartition(final int partitionSize,
final Serializer serializer,
final K key,
final InputStream inputStream)
throws IOException {
final List deserializedData = new ArrayList();
final InputStreamIterator iterator = new
InputStreamIterator(Collections.singletonList(inputStream).iterator(),
- serializer, elementsInPartition);
+ serializer, partitionSize);
iterator.forEachRemaining(deserializedData::add);
return new NonSerializedPartition(key, deserializedData,
iterator.getNumSerializedBytes(),
iterator.getNumEncodedBytes());
@@ -111,15 +106,14 @@ public static long serializePartition(final
EncoderFactory encoderFactory,
final DirectByteArrayOutputStream bytesOutputStream = new
DirectByteArrayOutputStream();
final OutputStream wrappedStream =
buildOutputStream(bytesOutputStream, serializer.getEncodeStreamChainers());
) {
- final long elementsTotal =
- serializePartition(serializer.getEncoderFactory(),
partitionToConvert, wrappedStream);
+ serializePartition(serializer.getEncoderFactory(), partitionToConvert,
wrappedStream);
// We need to close wrappedStream on here, because
DirectByteArrayOutputStream:getBufDirectly() returns
// inner buffer directly, which can be an unfinished(not flushed)
buffer.
wrappedStream.close();
final byte[] serializedBytes = bytesOutputStream.getBufDirectly();
final int actualLength = bytesOutputStream.getCount();
serializedPartitions.add(
- new SerializedPartition<>(partitionToConvert.getKey(),
elementsTotal, serializedBytes, actualLength));
+ new SerializedPartition<>(partitionToConvert.getKey(),
serializedBytes, actualLength));
}
}
return serializedPartitions;
@@ -144,7 +138,7 @@ public static long serializePartition(final EncoderFactory
encoderFactory,
try (final ByteArrayInputStream byteArrayInputStream =
new ByteArrayInputStream(partitionToConvert.getData())) {
final NonSerializedPartition<K> deserializePartition =
deserializePartition(
- partitionToConvert.getElementsCount(), serializer, key,
byteArrayInputStream);
+ partitionToConvert.getLength(), serializer, key,
byteArrayInputStream);
nonSerializedPartitions.add(deserializePartition);
}
}
@@ -211,7 +205,6 @@ public static Iterable concatNonSerPartitions(final
Iterable<NonSerializedPartit
private volatile T next;
private volatile boolean cannotContinueDecoding = false;
private volatile DecoderFactory.Decoder<T> decoder = null;
- private volatile long elementsDecoded = 0;
private volatile long numSerializedBytes = 0;
private volatile long numEncodedBytes = 0;
@@ -221,8 +214,8 @@ public static Iterable concatNonSerPartitions(final
Iterable<NonSerializedPartit
* @param inputStreams The streams to read data from.
* @param serializer The serializer.
*/
- public InputStreamIterator(final Iterator<InputStream> inputStreams,
- final Serializer<?, T> serializer) {
+ InputStreamIterator(final Iterator<InputStream> inputStreams,
+ final Serializer<?, T> serializer) {
this.inputStreams = inputStreams;
this.serializer = serializer;
// -1 means no limit.
@@ -234,12 +227,12 @@ public InputStreamIterator(final Iterator<InputStream>
inputStreams,
*
* @param inputStreams The streams to read data from.
* @param serializer The serializer.
- * @param limit The number of elements from the {@link InputStream}.
+ * @param limit The bytes to read from the {@link InputStream}.
*/
- public InputStreamIterator(
+ private InputStreamIterator(
final Iterator<InputStream> inputStreams,
final Serializer<?, T> serializer,
- final long limit) {
+ final int limit) {
if (limit < 0) {
throw new IllegalArgumentException("Negative limit not allowed.");
}
@@ -256,7 +249,8 @@ public boolean hasNext() {
if (cannotContinueDecoding) {
return false;
}
- if (limit != -1 && limit == elementsDecoded) {
+ if (limit != -1 && limit == (serializedCountingStream == null
+ ? numSerializedBytes : numSerializedBytes +
serializedCountingStream.getCount())) {
cannotContinueDecoding = true;
return false;
}
@@ -280,7 +274,6 @@ public boolean hasNext() {
try {
next = decoder.decode();
hasNext = true;
- elementsDecoded++;
return true;
} catch (final IOException e) {
// IOException from decoder indicates EOF event.
diff --git
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/FileBlock.java
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/FileBlock.java
index 9a64d8601..41f125b65 100644
---
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/FileBlock.java
+++
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/FileBlock.java
@@ -78,8 +78,7 @@ private void writeToFile(final
Iterable<SerializedPartition<K>> serializedPartit
try (final FileOutputStream fileOutputStream = new
FileOutputStream(filePath, true)) {
for (final SerializedPartition<K> serializedPartition :
serializedPartitions) {
// Reserve a partition write and get the metadata.
- metadata.writePartitionMetadata(
- serializedPartition.getKey(), serializedPartition.getLength(),
serializedPartition.getElementsCount());
+ metadata.writePartitionMetadata(serializedPartition.getKey(),
serializedPartition.getLength());
fileOutputStream.write(serializedPartition.getData(), 0,
serializedPartition.getLength());
}
}
@@ -187,7 +186,7 @@ public void writeSerializedPartitions(final
Iterable<SerializedPartition<K>> par
new LimitedInputStream(fileStream,
partitionMetadata.getPartitionSize());
final NonSerializedPartition<K> deserializePartition =
DataUtil.deserializePartition(
- partitionMetadata.getElementsTotal(), serializer, key,
limitedInputStream);
+ partitionMetadata.getPartitionSize(), serializer, key,
limitedInputStream);
deserializedPartitions.add(deserializePartition);
// rearrange file pointer
final long toSkip = partitionMetadata.getPartitionSize() -
availableBefore + fileStream.available();
@@ -237,7 +236,7 @@ public void writeSerializedPartitions(final
Iterable<SerializedPartition<K>> par
throw new IOException("The read data size does not match with
the partition size.");
}
partitionsInRange.add(new SerializedPartition<>(
- key, partitionmetadata.getElementsTotal(), serializedData,
serializedData.length));
+ key, serializedData, serializedData.length));
} else {
// Have to skip this partition.
skipBytes(fileStream, partitionmetadata.getPartitionSize());
diff --git
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/metadata/FileMetadata.java
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/metadata/FileMetadata.java
index 3eafe8ebb..108a4e79b 100644
---
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/metadata/FileMetadata.java
+++
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/metadata/FileMetadata.java
@@ -58,18 +58,16 @@ public FileMetadata(final List<PartitionMetadata<K>>
partitionMetadataList) {
*
* @param key the key of the partition.
* @param partitionSize the size of the partition.
- * @param elementsTotal the number of elements in the partition.
* @throws IOException if fail to append the partition metadata.
*/
public final synchronized void writePartitionMetadata(final K key,
- final int
partitionSize,
- final long
elementsTotal) throws IOException {
+ final int
partitionSize) throws IOException {
if (committed.get()) {
throw new IOException("Cannot write a new block to a closed partition.");
}
final PartitionMetadata partitionMetadata =
- new PartitionMetadata(key, partitionSize, writtenBytesCursor,
elementsTotal);
+ new PartitionMetadata(key, partitionSize, writtenBytesCursor);
partitionMetadataList.add(partitionMetadata);
writtenBytesCursor += partitionSize;
}
diff --git
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/metadata/PartitionMetadata.java
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/metadata/PartitionMetadata.java
index 9d68e5a39..2fa9de22d 100644
---
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/metadata/PartitionMetadata.java
+++
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/metadata/PartitionMetadata.java
@@ -25,7 +25,6 @@
private final K key;
private final int partitionSize;
private final long offset;
- private final long elementsTotal;
/**
* Constructor.
@@ -33,16 +32,13 @@
* @param key the key of this partition.
* @param partitionSize the size of this partition.
* @param offset the offset of this partition.
- * @param elementsTotal the total number of elements in this partition.
*/
public PartitionMetadata(final K key,
final int partitionSize,
- final long offset,
- final long elementsTotal) {
+ final long offset) {
this.key = key;
this.partitionSize = partitionSize;
this.offset = offset;
- this.elementsTotal = elementsTotal;
}
/**
@@ -66,13 +62,6 @@ public long getOffset() {
return offset;
}
- /**
- * @return the total number of elements in this partition.
- */
- public long getElementsTotal() {
- return elementsTotal;
- }
-
@Override
public String toString() {
final StringBuilder sb = new StringBuilder();
@@ -82,8 +71,6 @@ public String toString() {
sb.append(partitionSize);
sb.append("/ offset: ");
sb.append(offset);
- sb.append("/ elementsTotal: ");
- sb.append(elementsTotal);
return sb.toString();
}
}
diff --git
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/metadata/RemoteFileMetadata.java
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/metadata/RemoteFileMetadata.java
index 9a22e438d..8ad212635 100644
---
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/metadata/RemoteFileMetadata.java
+++
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/metadata/RemoteFileMetadata.java
@@ -83,7 +83,6 @@ public synchronized void commitBlock() throws IOException {
dataOutputStream.write(key);
dataOutputStream.writeInt(partitionMetadata.getPartitionSize());
dataOutputStream.writeLong(partitionMetadata.getOffset());
- dataOutputStream.writeLong(partitionMetadata.getElementsTotal());
}
}
setCommitted(true);
@@ -127,7 +126,6 @@ public synchronized void commitBlock() throws IOException {
final PartitionMetadata<T> partitionMetadata = new PartitionMetadata<>(
SerializationUtils.deserialize(desKey),
dataInputStream.readInt(),
- dataInputStream.readLong(),
dataInputStream.readLong()
);
partitionMetadataList.add(partitionMetadata);
diff --git
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/partition/SerializedPartition.java
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/partition/SerializedPartition.java
index d6d63d1e2..40150002e 100644
---
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/partition/SerializedPartition.java
+++
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/partition/SerializedPartition.java
@@ -32,7 +32,6 @@
*/
public final class SerializedPartition<K> implements Partition<byte[], K> {
private final K key;
- private volatile long elementsCount;
private volatile byte[] serializedData;
private volatile int length;
private volatile boolean committed;
@@ -52,7 +51,6 @@
public SerializedPartition(final K key,
final Serializer serializer) throws IOException {
this.key = key;
- this.elementsCount = 0;
this.serializedData = new byte[0];
this.length = 0;
this.committed = false;
@@ -66,16 +64,13 @@ public SerializedPartition(final K key,
* Data cannot be written to this partition after the construction.
*
* @param key the key.
- * @param elementsTotal the total number of elements.
* @param serializedData the serialized data.
* @param length the length of the actual serialized data. (It can
be different with serializedData.length)
*/
public SerializedPartition(final K key,
- final long elementsTotal,
final byte[] serializedData,
final int length) {
this.key = key;
- this.elementsCount = elementsTotal;
this.serializedData = serializedData;
this.length = length;
this.committed = true;
@@ -97,7 +92,6 @@ public void write(final Object element) throws IOException {
} else {
try {
encoder.encode(element);
- elementsCount++;
} catch (final IOException e) {
wrappedStream.close();
}
@@ -160,16 +154,4 @@ public int getLength() throws IOException {
return length;
}
}
-
- /**
- * @return the number of elements.
- * @throws IOException if the partition is not committed yet.
- */
- public long getElementsCount() throws IOException {
- if (!committed) {
- throw new IOException("The partition is not committed yet!");
- } else {
- return elementsCount;
- }
- }
}
diff --git
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferFactory.java
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferFactory.java
index 7a4068ebc..36db3ec1c 100644
---
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferFactory.java
+++
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferFactory.java
@@ -32,8 +32,8 @@
private final int hashRangeMultiplier;
@Inject
- public DataTransferFactory(@Parameter(JobConf.HashRangeMultiplier.class)
final int hashRangeMultiplier,
- final BlockManagerWorker blockManagerWorker) {
+ private DataTransferFactory(@Parameter(JobConf.HashRangeMultiplier.class)
final int hashRangeMultiplier,
+ final BlockManagerWorker blockManagerWorker) {
this.hashRangeMultiplier = hashRangeMultiplier;
this.blockManagerWorker = blockManagerWorker;
}
@@ -41,7 +41,7 @@ public
DataTransferFactory(@Parameter(JobConf.HashRangeMultiplier.class) final i
/**
* Creates an {@link OutputWriter} between two stages.
*
- * @param srcIRVertex the {@link IRVertex} that outputs the data to be
written.
+ * @param srcIRVertex the {@link IRVertex} that outputs the data to be
written.
* @param srcTaskIdx the index of the source task.
* @param dstIRVertex the {@link IRVertex} that will take the output data as
its input.
* @param runtimeEdge that connects the srcTask to the tasks belonging to
dstIRVertex.
diff --git
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java
index bb594f622..1744bb273 100644
---
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java
+++
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java
@@ -53,12 +53,12 @@
* @param runtimeEdge the {@link RuntimeEdge}.
* @param blockManagerWorker the {@link BlockManagerWorker}.
*/
- public OutputWriter(final int hashRangeMultiplier,
- final int srcTaskIdx,
- final String srcRuntimeVertexId,
- final IRVertex dstIrVertex,
- final RuntimeEdge<?> runtimeEdge,
- final BlockManagerWorker blockManagerWorker) {
+ OutputWriter(final int hashRangeMultiplier,
+ final int srcTaskIdx,
+ final String srcRuntimeVertexId,
+ final IRVertex dstIrVertex,
+ final RuntimeEdge<?> runtimeEdge,
+ final BlockManagerWorker blockManagerWorker) {
super(runtimeEdge.getId());
this.blockId = RuntimeIdGenerator.generateBlockId(getId(), srcTaskIdx);
this.runtimeEdge = runtimeEdge;
diff --git
a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
index df9454a9d..f80168519 100644
---
a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
+++
b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
@@ -91,16 +91,21 @@
SourceVertex.class, ClientRPC.class, MetricManagerMaster.class})
public final class DataTransferTest {
private static final String EXECUTOR_ID_PREFIX = "Executor";
- private static final InterTaskDataStoreProperty.Value MEMORY_STORE =
InterTaskDataStoreProperty.Value.MemoryStore;
- private static final InterTaskDataStoreProperty.Value SER_MEMORY_STORE =
InterTaskDataStoreProperty.Value.SerializedMemoryStore;
- private static final InterTaskDataStoreProperty.Value LOCAL_FILE_STORE =
InterTaskDataStoreProperty.Value.LocalFileStore;
- private static final InterTaskDataStoreProperty.Value REMOTE_FILE_STORE =
InterTaskDataStoreProperty.Value.GlusterFileStore;
+ private static final InterTaskDataStoreProperty.Value MEMORY_STORE =
+ InterTaskDataStoreProperty.Value.MemoryStore;
+ private static final InterTaskDataStoreProperty.Value SER_MEMORY_STORE =
+ InterTaskDataStoreProperty.Value.SerializedMemoryStore;
+ private static final InterTaskDataStoreProperty.Value LOCAL_FILE_STORE =
+ InterTaskDataStoreProperty.Value.LocalFileStore;
+ private static final InterTaskDataStoreProperty.Value REMOTE_FILE_STORE =
+ InterTaskDataStoreProperty.Value.GlusterFileStore;
private static final String TMP_LOCAL_FILE_DIRECTORY = "./tmpLocalFiles";
private static final String TMP_REMOTE_FILE_DIRECTORY = "./tmpRemoteFiles";
private static final int PARALLELISM_TEN = 10;
private static final String EDGE_PREFIX_TEMPLATE = "Dummy(%d)";
private static final AtomicInteger TEST_INDEX = new AtomicInteger(0);
- private static final EncoderFactory ENCODER_FACTORY =
PairEncoderFactory.of(IntEncoderFactory.of(), IntEncoderFactory.of());
+ private static final EncoderFactory ENCODER_FACTORY =
+ PairEncoderFactory.of(IntEncoderFactory.of(), IntEncoderFactory.of());
private static final DecoderFactory DECODER_FACTORY =
PairDecoderFactory.of(IntDecoderFactory.of(), IntDecoderFactory.of());
private static final Tang TANG = Tang.Factory.getTang();
@@ -108,6 +113,7 @@
private BlockManagerMaster master;
private BlockManagerWorker worker1;
+ private DataTransferFactory transferFactory;
private BlockManagerWorker worker2;
private HashMap<BlockManagerWorker, SerializerManager> serializerManagers =
new HashMap<>();
@@ -152,10 +158,12 @@ public void setUp() throws InjectionException {
injector2.bindVolatileParameter(JobConf.JobId.class, "data transfer test");
this.master = master;
- this.worker1 = createWorker(EXECUTOR_ID_PREFIX +
executorCount.getAndIncrement(), messageDispatcher,
- injector2);
+ final Pair<BlockManagerWorker, DataTransferFactory> pair1 =
+ createWorker(EXECUTOR_ID_PREFIX + executorCount.getAndIncrement(),
messageDispatcher, injector2);
+ this.worker1 = pair1.left();
+ this.transferFactory = pair1.right();
this.worker2 = createWorker(EXECUTOR_ID_PREFIX +
executorCount.getAndIncrement(), messageDispatcher,
- injector2);
+ injector2).left();
}
@After
@@ -164,8 +172,10 @@ public void tearDown() throws IOException {
FileUtils.deleteDirectory(new File(TMP_REMOTE_FILE_DIRECTORY));
}
- private BlockManagerWorker createWorker(final String executorId, final
LocalMessageDispatcher messageDispatcher,
- final Injector nameClientInjector) {
+ private Pair<BlockManagerWorker, DataTransferFactory> createWorker(
+ final String executorId,
+ final LocalMessageDispatcher messageDispatcher,
+ final Injector nameClientInjector) {
final LocalMessageEnvironment messageEnvironment = new
LocalMessageEnvironment(executorId, messageDispatcher);
final PersistentConnectionToMasterMap conToMaster = new
PersistentConnectionToMasterMap(messageEnvironment);
final Configuration executorConfiguration = TANG.newConfigurationBuilder()
@@ -180,11 +190,13 @@ private BlockManagerWorker createWorker(final String
executorId, final LocalMess
final BlockManagerWorker blockManagerWorker;
final MetricManagerWorker metricManagerWorker;
final SerializerManager serializerManager;
+ final DataTransferFactory dataTransferFactory;
try {
blockManagerWorker = injector.getInstance(BlockManagerWorker.class);
- metricManagerWorker = injector.getInstance(MetricManagerWorker.class);
+ metricManagerWorker = injector.getInstance(MetricManagerWorker.class);
serializerManager = injector.getInstance(SerializerManager.class);
serializerManagers.put(blockManagerWorker, serializerManager);
+ dataTransferFactory = injector.getInstance(DataTransferFactory.class);
} catch (final InjectionException e) {
throw new RuntimeException(e);
}
@@ -195,11 +207,11 @@ private BlockManagerWorker createWorker(final String
executorId, final LocalMess
conToMaster,
messageEnvironment,
serializerManager,
- new DataTransferFactory(HASH_RANGE_MULTIPLIER, blockManagerWorker),
+ dataTransferFactory,
metricManagerWorker);
injector.bindVolatileInstance(Executor.class, executor);
- return blockManagerWorker;
+ return Pair.of(blockManagerWorker, dataTransferFactory);
}
private Injector createNameClientInjector() {
@@ -336,8 +348,7 @@ private void writeAndRead(final BlockManagerWorker sender,
final List<List> dataWrittenList = new ArrayList<>();
IntStream.range(0, PARALLELISM_TEN).forEach(srcTaskIndex -> {
final List dataWritten = getRangedNumList(0, PARALLELISM_TEN);
- final OutputWriter writer = new OutputWriter(HASH_RANGE_MULTIPLIER,
srcTaskIndex, srcVertex.getId(), dstVertex,
- dummyEdge, sender);
+ final OutputWriter writer = transferFactory.createWriter(srcVertex,
srcTaskIndex, dstVertex, dummyEdge);
dataWritten.iterator().forEachRemaining(writer::write);
writer.close();
dataWrittenList.add(dataWritten);
@@ -432,14 +443,12 @@ private void writeAndReadWithDuplicateData(final
BlockManagerWorker sender,
final List<List> dataWrittenList = new ArrayList<>();
IntStream.range(0, PARALLELISM_TEN).forEach(srcTaskIndex -> {
final List dataWritten = getRangedNumList(0, PARALLELISM_TEN);
- final OutputWriter writer = new OutputWriter(HASH_RANGE_MULTIPLIER,
srcTaskIndex, srcVertex.getId(), dstVertex,
- dummyEdge, sender);
+ final OutputWriter writer = transferFactory.createWriter(srcVertex,
srcTaskIndex, dstVertex, dummyEdge);
dataWritten.iterator().forEachRemaining(writer::write);
writer.close();
dataWrittenList.add(dataWritten);
- final OutputWriter writer2 = new OutputWriter(HASH_RANGE_MULTIPLIER,
srcTaskIndex, srcVertex.getId(), dstVertex,
- dummyEdge2, sender);
+ final OutputWriter writer2 = transferFactory.createWriter(srcVertex,
srcTaskIndex, dstVertex, dummyEdge2);
dataWritten.iterator().forEachRemaining(writer2::write);
writer2.close();
});
diff --git
a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/composite/SailfishPassTest.java
b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/composite/SailfishPassTest.java
index 5d24f8127..ab1beb7b7 100644
---
a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/composite/SailfishPassTest.java
+++
b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/composite/SailfishPassTest.java
@@ -63,9 +63,8 @@ public void testSailfish() {
edgeToMerger.getPropertyValue(UsedDataHandlingProperty.class).get());
assertEquals(InterTaskDataStoreProperty.Value.SerializedMemoryStore,
edgeToMerger.getPropertyValue(InterTaskDataStoreProperty.class).get());
- // TODO #125: Fix data loss bug caused by SailfishSchedulingPolicy
- //assertEquals(BytesDecoderFactory.of(),
- // edgeToMerger.getPropertyValue(DecoderProperty.class).get());
+ assertEquals(BytesDecoderFactory.of(),
+ edgeToMerger.getPropertyValue(DecoderProperty.class).get());
} else {
assertEquals(DataFlowModelProperty.Value.Pull,
edgeToMerger.getPropertyValue(DataFlowModelProperty.class).get());
@@ -78,9 +77,8 @@ public void testSailfish() {
edgeFromMerger.getPropertyValue(DataCommunicationPatternProperty.class).get());
assertEquals(InterTaskDataStoreProperty.Value.LocalFileStore,
edgeFromMerger.getPropertyValue(InterTaskDataStoreProperty.class).get());
- // TODO #125: Fix data loss bug caused by SailfishSchedulingPolicy
- //assertEquals(BytesEncoderFactory.of(),
- // edgeFromMerger.getPropertyValue(EncoderProperty.class).get());
+ assertEquals(BytesEncoderFactory.of(),
+ edgeFromMerger.getPropertyValue(EncoderProperty.class).get());
});
} else {
// Non merger vertex.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services