wonook closed pull request #12: [NEMO-39] SonarCloud Bugs and Vulnerabilities
for RuntimeExecutor
URL: https://github.com/apache/incubator-nemo/pull/12
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
index 89284e28..2c493004 100644
---
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
+++
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
@@ -207,11 +207,13 @@ public void execute() {
Optional.of(TaskGroupState.RecoverableFailureCause.INPUT_READ_FAILURE));
LOG.warn("{} Execution Failed (Recoverable)! Exception: {}",
new Object[] {taskGroupId, ex.toString()});
+ Thread.currentThread().interrupt();
} catch (final BlockWriteException ex2) {
taskGroupStateManager.onTaskStateChanged(physicalTaskId,
TaskState.State.FAILED_RECOVERABLE,
Optional.of(TaskGroupState.RecoverableFailureCause.OUTPUT_WRITE_FAILURE));
LOG.warn("{} Execution Failed (Recoverable)! Exception: {}",
new Object[] {taskGroupId, ex2.toString()});
+ Thread.currentThread().interrupt();
} catch (final Exception e) {
taskGroupStateManager.onTaskStateChanged(
physicalTaskId, TaskState.State.FAILED_UNRECOVERABLE,
Optional.empty());
@@ -290,7 +292,10 @@ private void launchOperatorTask(final OperatorTask
operatorTask) {
}
sideInputMap.put(srcTransform, sideInput);
sideInputIterators.add(sideInputIterator);
- } catch (final InterruptedException | ExecutionException e) {
+ } catch (final InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new BlockFetchException(e);
+ } catch (final ExecutionException e) {
throw new BlockFetchException(e);
}
});
@@ -347,6 +352,7 @@ private void launchOperatorTask(final OperatorTask
operatorTask) {
}
}
} catch (final InterruptedException e) {
+ Thread.currentThread().interrupt();
throw new BlockFetchException(e);
}
@@ -424,6 +430,7 @@ private void launchMetricCollectionBarrierTask(final
MetricCollectionBarrierTask
}
}
} catch (final InterruptedException e) {
+ Thread.currentThread().interrupt();
throw new BlockFetchException(e);
}
}
diff --git
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/bytetransfer/ByteInputContext.java
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/bytetransfer/ByteInputContext.java
index e3dc30eb..a2fcd5ec 100644
---
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/bytetransfer/ByteInputContext.java
+++
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/bytetransfer/ByteInputContext.java
@@ -16,11 +16,14 @@
package edu.snu.nemo.runtime.executor.bytetransfer;
import io.netty.buffer.ByteBuf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.io.IOException;
import java.io.InputStream;
import java.util.Iterator;
+import java.util.NoSuchElementException;
import java.util.concurrent.CompletableFuture;
/**
@@ -34,6 +37,8 @@
*/
public final class ByteInputContext extends ByteTransferContext {
+ private static final Logger LOG =
LoggerFactory.getLogger(ByteInputContext.class.getName());
+
private final CompletableFuture<Iterator<InputStream>> completedFuture = new
CompletableFuture<>();
private final ClosableBlockingQueue<ByteBufInputStream> byteBufInputStreams
= new ClosableBlockingQueue<>();
private volatile ByteBufInputStream currentByteBufInputStream = null;
@@ -44,6 +49,7 @@ public boolean hasNext() {
try {
return byteBufInputStreams.peek() != null;
} catch (final InterruptedException e) {
+ Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
@@ -53,7 +59,9 @@ public InputStream next() {
try {
return byteBufInputStreams.take();
} catch (final InterruptedException e) {
- throw new RuntimeException(e);
+ Thread.currentThread().interrupt();
+ LOG.error("Interrupted while taking byte buf.", e);
+ throw new NoSuchElementException();
}
}
};
@@ -162,6 +170,7 @@ public int read() throws IOException {
}
return b;
} catch (final InterruptedException e) {
+ Thread.currentThread().interrupt();
throw new IOException(e);
}
}
@@ -196,6 +205,7 @@ public int read(final byte[] bytes, final int baseOffset,
final int maxLength) t
}
return readBytes;
} catch (final InterruptedException e) {
+ Thread.currentThread().interrupt();
throw new IOException(e);
}
}
@@ -230,6 +240,7 @@ public long skip(final long n) throws IOException {
}
return skippedBytes;
} catch (final InterruptedException e) {
+ Thread.currentThread().interrupt();
throw new IOException(e);
}
}
@@ -244,6 +255,7 @@ public int available() throws IOException {
return head.readableBytes();
}
} catch (final InterruptedException e) {
+ Thread.currentThread().interrupt();
throw new IOException(e);
}
}
diff --git
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/bytetransfer/ByteTransport.java
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/bytetransfer/ByteTransport.java
index 0f7eae29..fa5116b2 100644
---
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/bytetransfer/ByteTransport.java
+++
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/bytetransfer/ByteTransport.java
@@ -125,6 +125,7 @@ private ByteTransport(
break;
}
} catch (final InterruptedException e) {
+ Thread.currentThread().interrupt();
LOG.debug(String.format("Interrupted while binding to %s:%d", host,
candidatePort), e);
}
}
diff --git
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java
index d108a194..f340ab0d 100644
---
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java
+++
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java
@@ -409,13 +409,21 @@ public void run() {
|| DataStoreProperty.Value.GlusterFileStore.equals(blockStore)) {
final FileStore fileStore = (FileStore) getBlockStore(blockStore);
for (final FileArea fileArea : fileStore.getFileAreas(blockId,
keyRange)) {
- outputContext.newOutputStream().writeFileArea(fileArea).close();
+ try (final ByteOutputContext.ByteOutputStream stream =
outputContext.newOutputStream()) {
+ stream.writeFileArea(fileArea);
+ }
}
} else {
final Optional<Iterable<SerializedPartition>> optionalResult =
getBlockStore(blockStore)
.getSerializedPartitions(blockId, keyRange);
- for (final SerializedPartition partition : optionalResult.get()) {
-
outputContext.newOutputStream().writeSerializedPartition(partition).close();
+ if (optionalResult.isPresent()) {
+ for (final SerializedPartition partition : optionalResult.get())
{
+ try (final ByteOutputContext.ByteOutputStream stream =
outputContext.newOutputStream()) {
+ stream.writeSerializedPartition(partition);
+ }
+ }
+ } else {
+ throw new IOException("Block is not found!");
}
}
handleUsedData(blockStore, blockId);
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services