jeongyooneo commented on a change in pull request #5: [NEMO-27] Element Wise
Block Write
URL: https://github.com/apache/incubator-nemo/pull/5#discussion_r188491236
##########
File path:
runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/FileBlock.java
##########
@@ -54,98 +68,145 @@ public FileBlock(final Serializer serializer,
/**
* Writes the serialized data of this block having a specific key value as a
partition to the file
* where this block resides.
- * Invariant: This method does not support concurrent write for a single
block.
- * Only one thread have to write at once.
+ * Invariant: This method does not support concurrent write.
*
* @param serializedPartitions the iterable of the serialized partitions to
write.
* @throws IOException if fail to write.
*/
- private void writeSerializedPartitions(final
Iterable<SerializedPartition<K>> serializedPartitions)
+ private void writeToFile(final Iterable<SerializedPartition<K>>
serializedPartitions)
throws IOException {
try (final FileOutputStream fileOutputStream = new
FileOutputStream(filePath, true)) {
for (final SerializedPartition<K> serializedPartition :
serializedPartitions) {
// Reserve a partition write and get the metadata.
metadata.writePartitionMetadata(
- serializedPartition.getKey(), serializedPartition.getLength(),
serializedPartition.getElementsTotal());
+ serializedPartition.getKey(), serializedPartition.getLength(),
serializedPartition.getElementsCount());
fileOutputStream.write(serializedPartition.getData(), 0,
serializedPartition.getLength());
}
}
}
+ /**
+ * Writes an element to non-committed block.
+ * Invariant: This should not be invoked after this block is committed.
+ * Invariant: This method does not support concurrent write.
+ *
+ * @param key the key.
+ * @param element the element to write.
+ * @throws BlockWriteException for any error occurred while trying to write
a block.
+ */
+ @Override
+ public void write(final K key,
+ final Object element) throws BlockWriteException {
+ if (metadata.isCommitted()) {
+ throw new BlockWriteException(new Throwable("The partition is already
committed!"));
+ } else {
+ try {
+ SerializedPartition<K> partition = nonCommittedPartitionsMap.get(key);
+ if (partition == null) {
+ partition = new SerializedPartition<>(key, serializer);
+ nonCommittedPartitionsMap.put(key, partition);
+ }
+ partition.write(element);
+ } catch (final IOException e) {
+ throw new BlockWriteException(e);
+ }
+ }
+ }
+
/**
* Writes {@link NonSerializedPartition}s to this block.
+ * Invariant: This method does not support concurrent write.
*
* @param partitions the {@link NonSerializedPartition}s to write.
- * @throws IOException if fail to write.
+ * @throws BlockWriteException for any error occurred while trying to write
a block.
*/
@Override
- public Optional<List<Long>> putPartitions(final
Iterable<NonSerializedPartition<K>> partitions) throws IOException {
- final Iterable<SerializedPartition<K>> convertedPartitions =
- DataUtil.convertToSerPartitions(serializer, partitions);
-
- return Optional.of(putSerializedPartitions(convertedPartitions));
+ public void writePartitions(final Iterable<NonSerializedPartition<K>>
partitions)
+ throws BlockWriteException {
+ if (metadata.isCommitted()) {
+ throw new BlockWriteException(new Throwable("The partition is already
committed!"));
+ } else {
+ try {
+ final Iterable<SerializedPartition<K>> convertedPartitions =
+ DataUtil.convertToSerPartitions(serializer, partitions);
+ writeSerializedPartitions(convertedPartitions);
+ } catch (final IOException e) {
+ throw new BlockWriteException(e);
+ }
+ }
}
/**
* Writes {@link SerializedPartition}s to this block.
+ * Invariant: This method does not support concurrent write.
*
* @param partitions the {@link SerializedPartition}s to store.
- * @throws IOException if fail to store.
+ * @throws BlockWriteException for any error occurred while trying to write
a block.
*/
@Override
- public synchronized List<Long> putSerializedPartitions(final
Iterable<SerializedPartition<K>> partitions)
- throws IOException {
- final List<Long> partitionSizeList = new ArrayList<>();
- for (final SerializedPartition serializedPartition : partitions) {
- partitionSizeList.add((long) serializedPartition.getLength());
+ public void writeSerializedPartitions(final Iterable<SerializedPartition<K>>
partitions)
+ throws BlockWriteException {
+ if (metadata.isCommitted()) {
+ throw new BlockWriteException(new Throwable("The partition is already
committed!"));
+ } else {
+ try {
+ writeToFile(partitions);
+ } catch (final IOException e) {
+ throw new BlockWriteException(e);
+ }
}
- writeSerializedPartitions(partitions);
-
- return partitionSizeList;
}
/**
* Retrieves the partitions of this block from the file in a specific key
range and deserializes it.
*
* @param keyRange the key range.
* @return an iterable of {@link NonSerializedPartition}s.
- * @throws IOException if failed to retrieve.
+ * @throws BlockFetchException for any error occurred while trying to fetch
a block.
*/
@Override
- public Iterable<NonSerializedPartition<K>> getPartitions(final KeyRange
keyRange) throws IOException {
- // Deserialize the data
- final List<NonSerializedPartition<K>> deserializedPartitions = new
ArrayList<>();
- try (final FileInputStream fileStream = new FileInputStream(filePath)) {
- for (final PartitionMetadata<K> partitionMetadata :
metadata.getPartitionMetadataIterable()) {
- final K key = partitionMetadata.getKey();
- if (keyRange.includes(key)) {
- // The key value of this partition is in the range.
- final long availableBefore = fileStream.available();
- // We need to limit read bytes on this FileStream, which could be
over-read by wrapped
- // compression stream. We recommend to wrap with LimitedInputStream
once more when
- // reading input from chained compression InputStream.
- // Plus, this stream must be not closed to prevent to close the
filtered file partition.
- final LimitedInputStream limitedInputStream =
- new LimitedInputStream(fileStream,
partitionMetadata.getPartitionSize());
- final NonSerializedPartition<K> deserializePartition =
- DataUtil.deserializePartition(
- partitionMetadata.getElementsTotal(), serializer, key,
limitedInputStream);
- deserializedPartitions.add(deserializePartition);
- // rearrange file pointer
- final long toSkip = partitionMetadata.getPartitionSize() -
availableBefore + fileStream.available();
- if (toSkip > 0) {
- skipBytes(fileStream, toSkip);
- } else if (toSkip < 0) {
- throw new IOException("file stream has been overread");
+ public Iterable<NonSerializedPartition<K>> readPartitions(final KeyRange
keyRange) throws BlockFetchException {
+ if (!metadata.isCommitted()) {
+ throw new BlockFetchException(new Throwable("Cannot retrieve elements
before a block is committed"));
+ } else {
+ // Deserialize the data
+ final List<NonSerializedPartition<K>> deserializedPartitions = new
ArrayList<>();
+ try {
+ try (final FileInputStream fileStream = new FileInputStream(filePath))
{
+ for (final PartitionMetadata<K> partitionMetadata :
metadata.getPartitionMetadataList()) {
+ final K key = partitionMetadata.getKey();
+ if (keyRange.includes(key)) {
+ // The key value of this partition is in the range.
+ final long availableBefore = fileStream.available();
+ // We need to limit read bytes on this FileStream, which could
be over-read by wrapped
Review comment:
> We need to limit read bytes on this FileStream, which could be over-read
by wrapped compression stream.
Could you elaborate more on this? Under what circumstance this
'over-reading' occurs? Also, how can 'limiting read bytes' prevent this?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services