jeongyooneo commented on a change in pull request #5: [NEMO-27] Element Wise 
Block Write
URL: https://github.com/apache/incubator-nemo/pull/5#discussion_r188491236
 
 

 ##########
 File path: 
runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/FileBlock.java
 ##########
 @@ -54,98 +68,145 @@ public FileBlock(final Serializer serializer,
   /**
    * Writes the serialized data of this block having a specific key value as a 
partition to the file
    * where this block resides.
-   * Invariant: This method does not support concurrent write for a single 
block.
-   *            Only one thread have to write at once.
+   * Invariant: This method does not support concurrent write.
    *
    * @param serializedPartitions the iterable of the serialized partitions to 
write.
    * @throws IOException if fail to write.
    */
-  private void writeSerializedPartitions(final 
Iterable<SerializedPartition<K>> serializedPartitions)
+  private void writeToFile(final Iterable<SerializedPartition<K>> 
serializedPartitions)
       throws IOException {
     try (final FileOutputStream fileOutputStream = new 
FileOutputStream(filePath, true)) {
       for (final SerializedPartition<K> serializedPartition : 
serializedPartitions) {
         // Reserve a partition write and get the metadata.
         metadata.writePartitionMetadata(
-            serializedPartition.getKey(), serializedPartition.getLength(), 
serializedPartition.getElementsTotal());
+            serializedPartition.getKey(), serializedPartition.getLength(), 
serializedPartition.getElementsCount());
         fileOutputStream.write(serializedPartition.getData(), 0, 
serializedPartition.getLength());
       }
     }
   }
 
+  /**
+   * Writes an element to non-committed block.
+   * Invariant: This should not be invoked after this block is committed.
+   * Invariant: This method does not support concurrent write.
+   *
+   * @param key     the key.
+   * @param element the element to write.
+   * @throws BlockWriteException for any error occurred while trying to write 
a block.
+   */
+  @Override
+  public void write(final K key,
+                    final Object element) throws BlockWriteException {
+    if (metadata.isCommitted()) {
+      throw new BlockWriteException(new Throwable("The partition is already 
committed!"));
+    } else {
+      try {
+        SerializedPartition<K> partition = nonCommittedPartitionsMap.get(key);
+        if (partition == null) {
+          partition = new SerializedPartition<>(key, serializer);
+          nonCommittedPartitionsMap.put(key, partition);
+        }
+        partition.write(element);
+      } catch (final IOException e) {
+        throw new BlockWriteException(e);
+      }
+    }
+  }
+
   /**
    * Writes {@link NonSerializedPartition}s to this block.
+   * Invariant: This method does not support concurrent write.
    *
    * @param partitions the {@link NonSerializedPartition}s to write.
-   * @throws IOException if fail to write.
+   * @throws BlockWriteException for any error occurred while trying to write 
a block.
    */
   @Override
-  public Optional<List<Long>> putPartitions(final 
Iterable<NonSerializedPartition<K>> partitions) throws IOException {
-    final Iterable<SerializedPartition<K>> convertedPartitions =
-        DataUtil.convertToSerPartitions(serializer, partitions);
-
-    return Optional.of(putSerializedPartitions(convertedPartitions));
+  public void writePartitions(final Iterable<NonSerializedPartition<K>> 
partitions)
+      throws BlockWriteException {
+    if (metadata.isCommitted()) {
+      throw new BlockWriteException(new Throwable("The partition is already 
committed!"));
+    } else {
+      try {
+        final Iterable<SerializedPartition<K>> convertedPartitions =
+            DataUtil.convertToSerPartitions(serializer, partitions);
+        writeSerializedPartitions(convertedPartitions);
+      } catch (final IOException e) {
+        throw new BlockWriteException(e);
+      }
+    }
   }
 
   /**
    * Writes {@link SerializedPartition}s to this block.
+   * Invariant: This method does not support concurrent write.
    *
    * @param partitions the {@link SerializedPartition}s to store.
-   * @throws IOException if fail to store.
+   * @throws BlockWriteException for any error occurred while trying to write 
a block.
    */
   @Override
-  public synchronized List<Long> putSerializedPartitions(final 
Iterable<SerializedPartition<K>> partitions)
-      throws IOException {
-    final List<Long> partitionSizeList = new ArrayList<>();
-    for (final SerializedPartition serializedPartition : partitions) {
-      partitionSizeList.add((long) serializedPartition.getLength());
+  public void writeSerializedPartitions(final Iterable<SerializedPartition<K>> 
partitions)
+      throws BlockWriteException {
+    if (metadata.isCommitted()) {
+      throw new BlockWriteException(new Throwable("The partition is already 
committed!"));
+    } else {
+      try {
+        writeToFile(partitions);
+      } catch (final IOException e) {
+        throw new BlockWriteException(e);
+      }
     }
-    writeSerializedPartitions(partitions);
-
-    return partitionSizeList;
   }
 
   /**
    * Retrieves the partitions of this block from the file in a specific key 
range and deserializes it.
    *
    * @param keyRange the key range.
    * @return an iterable of {@link NonSerializedPartition}s.
-   * @throws IOException if failed to retrieve.
+   * @throws BlockFetchException for any error occurred while trying to fetch 
a block.
    */
   @Override
-  public Iterable<NonSerializedPartition<K>> getPartitions(final KeyRange 
keyRange) throws IOException {
-    // Deserialize the data
-    final List<NonSerializedPartition<K>> deserializedPartitions = new 
ArrayList<>();
-    try (final FileInputStream fileStream = new FileInputStream(filePath)) {
-      for (final PartitionMetadata<K> partitionMetadata : 
metadata.getPartitionMetadataIterable()) {
-        final K key = partitionMetadata.getKey();
-        if (keyRange.includes(key)) {
-          // The key value of this partition is in the range.
-          final long availableBefore = fileStream.available();
-          // We need to limit read bytes on this FileStream, which could be 
over-read by wrapped
-          // compression stream. We recommend to wrap with LimitedInputStream 
once more when
-          // reading input from chained compression InputStream.
-          // Plus, this stream must be not closed to prevent to close the 
filtered file partition.
-          final LimitedInputStream limitedInputStream =
-              new LimitedInputStream(fileStream, 
partitionMetadata.getPartitionSize());
-          final NonSerializedPartition<K> deserializePartition =
-              DataUtil.deserializePartition(
-                  partitionMetadata.getElementsTotal(), serializer, key, 
limitedInputStream);
-          deserializedPartitions.add(deserializePartition);
-          // rearrange file pointer
-          final long toSkip = partitionMetadata.getPartitionSize() - 
availableBefore + fileStream.available();
-          if (toSkip > 0) {
-            skipBytes(fileStream, toSkip);
-          } else if (toSkip < 0) {
-            throw new IOException("file stream has been overread");
+  public Iterable<NonSerializedPartition<K>> readPartitions(final KeyRange 
keyRange) throws BlockFetchException {
+    if (!metadata.isCommitted()) {
+      throw new BlockFetchException(new Throwable("Cannot retrieve elements 
before a block is committed"));
+    } else {
+      // Deserialize the data
+      final List<NonSerializedPartition<K>> deserializedPartitions = new 
ArrayList<>();
+      try {
+        try (final FileInputStream fileStream = new FileInputStream(filePath)) 
{
+          for (final PartitionMetadata<K> partitionMetadata : 
metadata.getPartitionMetadataList()) {
+            final K key = partitionMetadata.getKey();
+            if (keyRange.includes(key)) {
+              // The key value of this partition is in the range.
+              final long availableBefore = fileStream.available();
+              // We need to limit read bytes on this FileStream, which could 
be over-read by wrapped
 
 Review comment:
   > We need to limit read bytes on this FileStream, which could be over-read 
by wrapped compression stream.
   
   Could you elaborate more on this? Under what circumstance this 
'over-reading' occurs? Also, how can 'limiting read bytes' prevent this?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to