This is an automated email from the ASF dual-hosted git repository.
johnyangk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
The following commit(s) were added to refs/heads/master by this push:
new 435a274 [NEMO-176] Improve sequential read from disk #92
435a274 is described below
commit 435a2740a1760e091f5b98b40c69820965108fc0
Author: Sanha Lee <[email protected]>
AuthorDate: Wed Aug 8 17:04:11 2018 +0900
[NEMO-176] Improve sequential read from disk #92
JIRA: [NEMO-176: Improve sequential read from
disk](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-176)
**Major changes:**
- Read whole data to read from file stream first and decode them later in
`FileBlock`
**Minor changes to note:**
- Remove `limit` parameter in `InputStreamIterator` of `DataUtil` and make
`DataUtil#deserializePartition` to limit the stream itself by the partition
size.
- Remove the assumption that "the source and destination parallelism of
One-to-One communication is always 1" in `OutputWriter` and `InputReader`
**Tests for the changes:**
- Existing `DataTransferTest`, `BlockStoreTest` and other unit tests cover
this change.
- Existing integration tests also cover this change.
**Other comments:**
- N/A.
resolves
[NEMO-176](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-176)
---
.../snu/nemo/runtime/executor/data/DataUtil.java | 43 ++++++--------------
.../runtime/executor/data/block/FileBlock.java | 31 ++++++---------
.../runtime/executor/datatransfer/InputReader.java | 9 +----
.../executor/datatransfer/OutputWriter.java | 46 ++++++++++++++--------
.../executor/datatransfer/DataTransferTest.java | 18 ++-------
5 files changed, 57 insertions(+), 90 deletions(-)
diff --git
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/DataUtil.java
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/DataUtil.java
index 50c43d9..041283a 100644
---
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/DataUtil.java
+++
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/DataUtil.java
@@ -80,11 +80,17 @@ public final class DataUtil {
final InputStream inputStream)
throws IOException {
final List deserializedData = new ArrayList();
- final InputStreamIterator iterator = new
InputStreamIterator(Collections.singletonList(inputStream).iterator(),
- serializer, partitionSize);
- iterator.forEachRemaining(deserializedData::add);
- return new NonSerializedPartition(key, deserializedData,
iterator.getNumSerializedBytes(),
- iterator.getNumEncodedBytes());
+ // We need to limit read bytes on this inputStream, which could be
over-read by wrapped
+ // compression stream. This depends on the nature of the compression
algorithm used.
+ // We recommend to wrap with LimitedInputStream once more when
+ // reading input from chained compression InputStream.
+ try (final LimitedInputStream limitedInputStream = new
LimitedInputStream(inputStream, partitionSize)) {
+ final InputStreamIterator iterator =
+ new
InputStreamIterator(Collections.singletonList(limitedInputStream).iterator(),
serializer);
+ iterator.forEachRemaining(deserializedData::add);
+ return new NonSerializedPartition(key, deserializedData,
iterator.getNumSerializedBytes(),
+ iterator.getNumEncodedBytes());
+ }
}
/**
@@ -197,7 +203,6 @@ public final class DataUtil {
private final Iterator<InputStream> inputStreams;
private final Serializer<?, T> serializer;
- private final long limit;
private volatile CountingInputStream serializedCountingStream = null;
private volatile CountingInputStream encodedCountingStream = null;
@@ -218,27 +223,6 @@ public final class DataUtil {
final Serializer<?, T> serializer) {
this.inputStreams = inputStreams;
this.serializer = serializer;
- // -1 means no limit.
- this.limit = -1;
- }
-
- /**
- * Construct {@link Iterator} from {@link InputStream} and {@link
DecoderFactory}.
- *
- * @param inputStreams The streams to read data from.
- * @param serializer The serializer.
- * @param limit The bytes to read from the {@link InputStream}.
- */
- private InputStreamIterator(
- final Iterator<InputStream> inputStreams,
- final Serializer<?, T> serializer,
- final int limit) {
- if (limit < 0) {
- throw new IllegalArgumentException("Negative limit not allowed.");
- }
- this.inputStreams = inputStreams;
- this.serializer = serializer;
- this.limit = limit;
}
@Override
@@ -249,11 +233,6 @@ public final class DataUtil {
if (cannotContinueDecoding) {
return false;
}
- if (limit != -1 && limit == (serializedCountingStream == null
- ? numSerializedBytes : numSerializedBytes +
serializedCountingStream.getCount())) {
- cannotContinueDecoding = true;
- return false;
- }
while (true) {
try {
if (decoder == null) {
diff --git
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/FileBlock.java
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/FileBlock.java
index b9a04e4..6eef824 100644
---
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/FileBlock.java
+++
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/FileBlock.java
@@ -15,6 +15,7 @@
*/
package edu.snu.nemo.runtime.executor.data.block;
+import edu.snu.nemo.common.Pair;
import edu.snu.nemo.common.exception.BlockFetchException;
import edu.snu.nemo.common.exception.BlockWriteException;
import edu.snu.nemo.runtime.common.data.KeyRange;
@@ -172,36 +173,28 @@ public final class FileBlock<K extends Serializable>
implements Block<K> {
// Deserialize the data
final List<NonSerializedPartition<K>> deserializedPartitions = new
ArrayList<>();
try {
+ final List<Pair<K, byte[]>> partitionKeyBytesPairs = new ArrayList<>();
try (final FileInputStream fileStream = new FileInputStream(filePath))
{
for (final PartitionMetadata<K> partitionMetadata :
metadata.getPartitionMetadataList()) {
final K key = partitionMetadata.getKey();
if (keyRange.includes(key)) {
// The key value of this partition is in the range.
- final long availableBefore = fileStream.available();
- // We need to limit read bytes on this FileStream, which could
be over-read by wrapped
- // compression stream. This depends on the nature of the
compression algorithm used.
- // We recommend to wrap with LimitedInputStream once more when
- // reading input from chained compression InputStream.
- // Plus, this stream must be not closed to prevent to close the
filtered file partition.
- final LimitedInputStream limitedInputStream =
- new LimitedInputStream(fileStream,
partitionMetadata.getPartitionSize());
- final NonSerializedPartition<K> deserializePartition =
- DataUtil.deserializePartition(
- partitionMetadata.getPartitionSize(), serializer, key,
limitedInputStream);
- deserializedPartitions.add(deserializePartition);
- // rearrange file pointer
- final long toSkip = partitionMetadata.getPartitionSize() -
availableBefore + fileStream.available();
- if (toSkip > 0) {
- skipBytes(fileStream, toSkip);
- } else if (toSkip < 0) {
- throw new IOException("file stream has been overread");
- }
+ final byte[] partitionBytes = new
byte[partitionMetadata.getPartitionSize()];
+ fileStream.read(partitionBytes, 0,
partitionMetadata.getPartitionSize());
+ partitionKeyBytesPairs.add(Pair.of(key, partitionBytes));
} else {
// Have to skip this partition.
skipBytes(fileStream, partitionMetadata.getPartitionSize());
}
}
}
+ for (final Pair<K, byte[]> partitionKeyBytes : partitionKeyBytesPairs)
{
+ final NonSerializedPartition<K> deserializePartition =
+ DataUtil.deserializePartition(
+ partitionKeyBytes.right().length, serializer,
partitionKeyBytes.left(),
+ new ByteArrayInputStream(partitionKeyBytes.right()));
+ deserializedPartitions.add(deserializePartition);
+ }
} catch (final IOException e) {
throw new BlockFetchException(e);
}
diff --git
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java
index e9e563b..4471ae4 100644
---
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java
+++
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java
@@ -170,13 +170,8 @@ public final class InputReader extends DataTransfer {
* @return the parallelism of the source task.
*/
public int getSourceParallelism() {
- if (CommunicationPatternProperty.Value.OneToOne
-
.equals(runtimeEdge.getPropertyValue(CommunicationPatternProperty.class).get()))
{
- return 1;
- } else {
- final Integer numSrcTasks =
srcVertex.getPropertyValue(ParallelismProperty.class).get();
- return numSrcTasks;
- }
+ return srcVertex.getPropertyValue(ParallelismProperty.class).
+ orElseThrow(() -> new RuntimeException("No parallelism property on
this edge."));
}
/**
diff --git
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java
index 6e4164a..71d810a 100644
---
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java
+++
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java
@@ -65,22 +65,28 @@ public final class OutputWriter extends DataTransfer
implements AutoCloseable {
this.srcVertexId = srcRuntimeVertexId;
this.dstIrVertex = dstIrVertex;
this.blockManagerWorker = blockManagerWorker;
- this.blockStoreValue =
runtimeEdge.getPropertyValue(DataStoreProperty.class).get();
+ this.blockStoreValue =
runtimeEdge.getPropertyValue(DataStoreProperty.class).
+ orElseThrow(() -> new RuntimeException("No data store property on the
edge"));
+
// Setup partitioner
- final int dstParallelism = getDstParallelism();
+ final int dstParallelism =
dstIrVertex.getPropertyValue(ParallelismProperty.class).
+ orElseThrow(() -> new RuntimeException("No parallelism property on the
destination vertex"));
final Optional<KeyExtractor> keyExtractor =
runtimeEdge.getPropertyValue(KeyExtractorProperty.class);
final PartitionerProperty.Value partitionerPropertyValue =
- runtimeEdge.getPropertyValue(PartitionerProperty.class).get();
+ runtimeEdge.getPropertyValue(PartitionerProperty.class).
+ orElseThrow(() -> new RuntimeException("No partitioner property on
the edge"));
switch (partitionerPropertyValue) {
case IntactPartitioner:
this.partitioner = new IntactPartitioner();
break;
case HashPartitioner:
- this.partitioner = new HashPartitioner(dstParallelism,
keyExtractor.get());
+ this.partitioner = new HashPartitioner(dstParallelism, keyExtractor.
+ orElseThrow(() -> new RuntimeException("No key extractor property
on the edge")));
break;
case DataSkewHashPartitioner:
- this.partitioner = new DataSkewHashPartitioner(hashRangeMultiplier,
dstParallelism, keyExtractor.get());
+ this.partitioner = new DataSkewHashPartitioner(hashRangeMultiplier,
dstParallelism, keyExtractor.
+ orElseThrow(() -> new RuntimeException("No key extractor property
on the edge")));
break;
case DedicatedKeyPerElementPartitioner:
this.partitioner = new DedicatedKeyPerElementPartitioner();
@@ -122,10 +128,8 @@ public final class OutputWriter extends DataTransfer
implements AutoCloseable {
public void close() {
// Commit block.
final DataPersistenceProperty.Value persistence =
- runtimeEdge.getPropertyValue(DataPersistenceProperty.class).get();
- final Optional<DuplicateEdgeGroupPropertyValue> duplicateDataProperty =
- runtimeEdge.getPropertyValue(DuplicateEdgeGroupProperty.class);
- final int multiplier = duplicateDataProperty.isPresent() ?
duplicateDataProperty.get().getGroupSize() : 1;
+ runtimeEdge.getPropertyValue(DataPersistenceProperty.class).
+ orElseThrow(() -> new RuntimeException("No data persistence
property on the edge"));
final boolean isDataSizeMetricCollectionEdge =
Optional.of(MetricCollectionProperty.Value.DataSkewRuntimePass)
.equals(runtimeEdge.getPropertyValue(MetricCollectionProperty.class));
@@ -138,11 +142,11 @@ public final class OutputWriter extends DataTransfer
implements AutoCloseable {
}
this.writtenBytes = blockSizeTotal;
blockManagerWorker.writeBlock(blockToWrite, blockStoreValue,
isDataSizeMetricCollectionEdge,
- partitionSizeMap.get(), srcVertexId, getDstParallelism() *
multiplier, persistence);
+ partitionSizeMap.get(), srcVertexId, getExpectedRead(), persistence);
} else {
this.writtenBytes = -1; // no written bytes info.
blockManagerWorker.writeBlock(blockToWrite, blockStoreValue,
isDataSizeMetricCollectionEdge,
- Collections.emptyMap(), srcVertexId, getDstParallelism() *
multiplier, persistence);
+ Collections.emptyMap(), srcVertexId, getExpectedRead(), persistence);
}
}
@@ -158,13 +162,21 @@ public final class OutputWriter extends DataTransfer
implements AutoCloseable {
}
/**
- * Get the parallelism of the destination task.
+ * Get the expected number of data read according to the communication
pattern of the edge and
+ * the parallelism of destination vertex.
*
- * @return the parallelism of the destination task.
+ * @return the expected number of data read.
*/
- private int getDstParallelism() {
- return CommunicationPatternProperty.Value.OneToOne.equals(
- runtimeEdge.getPropertyValue(CommunicationPatternProperty.class).get())
- ? 1 : dstIrVertex.getPropertyValue(ParallelismProperty.class).get();
+ private int getExpectedRead() {
+ final Optional<DuplicateEdgeGroupPropertyValue> duplicateDataProperty =
+ runtimeEdge.getPropertyValue(DuplicateEdgeGroupProperty.class);
+ final int duplicatedDataMultiplier =
+ duplicateDataProperty.isPresent() ?
duplicateDataProperty.get().getGroupSize() : 1;
+ final int readForABlock =
CommunicationPatternProperty.Value.OneToOne.equals(
+
runtimeEdge.getPropertyValue(CommunicationPatternProperty.class).orElseThrow(
+ () -> new RuntimeException("No communication pattern on this
edge.")))
+ ? 1 :
dstIrVertex.getPropertyValue(ParallelismProperty.class).orElseThrow(
+ () -> new RuntimeException("No parallelism property on the
destination vertex."));
+ return readForABlock * duplicatedDataMultiplier;
}
}
diff --git
a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
index b46f7ba..f309e49 100644
---
a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
+++
b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
@@ -334,11 +334,7 @@ public final class DataTransferTest {
final InputReader reader =
new InputReader(dstTaskIndex, srcVertex, dummyEdge, receiver);
- if (CommunicationPatternProperty.Value.OneToOne.equals(commPattern)) {
- assertEquals(1, reader.getSourceParallelism());
- } else {
- assertEquals(PARALLELISM_TEN, reader.getSourceParallelism());
- }
+ assertEquals(PARALLELISM_TEN, reader.getSourceParallelism());
final List dataRead = new ArrayList<>();
try {
@@ -436,17 +432,9 @@ public final class DataTransferTest {
final InputReader reader2 =
new InputReader(dstTaskIndex, srcVertex, dummyEdge2, receiver);
- if (CommunicationPatternProperty.Value.OneToOne.equals(commPattern)) {
- assertEquals(1, reader.getSourceParallelism());
- } else {
- assertEquals(PARALLELISM_TEN, reader.getSourceParallelism());
- }
+ assertEquals(PARALLELISM_TEN, reader.getSourceParallelism());
- if (CommunicationPatternProperty.Value.OneToOne.equals(commPattern)) {
- assertEquals(1, reader2.getSourceParallelism());
- } else {
- assertEquals(PARALLELISM_TEN, reader2.getSourceParallelism());
- }
+ assertEquals(PARALLELISM_TEN, reader2.getSourceParallelism());
final List dataRead = new ArrayList<>();
try {