belugabehr commented on a change in pull request #1280:
URL: https://github.com/apache/hive/pull/1280#discussion_r457496282
##########
File path: storage-api/src/java/org/apache/hive/common/util/BloomKFilter.java
##########
@@ -527,20 +527,19 @@ public void add(ElementWrapper wrapper) {
@Override
public void run() {
while (!executor.isTerminated() && !queue.isEmpty()) {
Review comment:
A bit unrelated, but since you're touching this code. This check is
completely useless:
```
while (!executor.isTerminated() && !queue.isEmpty()) {
...
}
```
I cannot think of many scenarios where the thread needs to check the state
of its own `ExecutorService`. If the `ExecutorService` is terminated, it will
Interrupt every thread in the pool and that should cause it to cease to run.
Also, checking if the `Queue` is empty is improper. You will have two threads
that check the state of the Queue (size = 1), see the same non-empty queue, and
both try to read, even if there is only one item left. Both should just try to
`take` and one will succeed and the other will fail.
##########
File path: storage-api/src/java/org/apache/hive/common/util/BloomKFilter.java
##########
@@ -527,20 +527,19 @@ public void add(ElementWrapper wrapper) {
@Override
public void run() {
while (!executor.isTerminated() && !queue.isEmpty()) {
- ElementWrapper currentBf = queue.poll();
+ ElementWrapper currentBf = null;
+ try {
+ currentBf = queue.take();
+ } catch (InterruptedException e) {
Review comment:
Do not ignore. An Interrupt means that it's time to exit.
##########
File path: storage-api/src/java/org/apache/hive/common/util/BloomKFilter.java
##########
@@ -506,18 +505,19 @@ public ElementWrapper(byte[] bytes, int start, int
length, int modifiedStart, in
}
private static class BloomFilterMergeWorker implements Runnable {
- Queue<ElementWrapper> queue = new LinkedBlockingDeque<>();
+ ArrayBlockingQueue<ElementWrapper> queue;
Review comment:
Use the generic `BlockingQueue` here.
##########
File path: storage-api/src/java/org/apache/hive/common/util/BloomKFilter.java
##########
@@ -506,18 +505,19 @@ public ElementWrapper(byte[] bytes, int start, int
length, int modifiedStart, in
}
private static class BloomFilterMergeWorker implements Runnable {
- Queue<ElementWrapper> queue = new LinkedBlockingDeque<>();
+ ArrayBlockingQueue<ElementWrapper> queue;
private ExecutorService executor;
private byte[] bfAggregation;
private int bfAggregationStart;
private int bfAggregationLength;
- public BloomFilterMergeWorker(ExecutorService executor, byte[]
bfAggregation, int bfAggregationStart, int bfAggregationLength) {
+ public BloomFilterMergeWorker(ExecutorService executor, int batchSize,
byte[] bfAggregation, int bfAggregationStart, int bfAggregationLength) {
this.executor = executor;
Review comment:
Do not capture this value.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]