This is an automated email from the ASF dual-hosted git repository. amestry pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/atlas.git
commit cc601d7371fae1dbc16b55d1ca84f06b745700dc Author: Ashutosh Mestry <[email protected]> AuthorDate: Mon Mar 2 09:17:41 2020 -0800 ATLAS-3642: PC fx: WorkItemManager getResults Modification. --- intg/src/main/java/org/apache/atlas/pc/WorkItemConsumer.java | 11 ++++------- intg/src/main/java/org/apache/atlas/pc/WorkItemManager.java | 9 +++++---- 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/intg/src/main/java/org/apache/atlas/pc/WorkItemConsumer.java b/intg/src/main/java/org/apache/atlas/pc/WorkItemConsumer.java index 9ba4bf4..dd76697 100644 --- a/intg/src/main/java/org/apache/atlas/pc/WorkItemConsumer.java +++ b/intg/src/main/java/org/apache/atlas/pc/WorkItemConsumer.java @@ -21,6 +21,7 @@ package org.apache.atlas.pc; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Queue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -37,7 +38,7 @@ public abstract class WorkItemConsumer<T> implements Runnable { private final AtomicBoolean isDirty = new AtomicBoolean(false); private final AtomicLong maxCommitTimeInMs = new AtomicLong(DEFAULT_COMMIT_TIME_IN_MS); private CountDownLatch countdownLatch; - private BlockingQueue<Object> results; + private Queue<Object> results; public WorkItemConsumer(BlockingQueue<T> queue) { this.queue = queue; @@ -101,11 +102,7 @@ public abstract class WorkItemConsumer<T> implements Runnable { protected abstract void processItem(T item); protected void addResult(Object value) { - try { - results.put(value); - } catch (InterruptedException e) { - LOG.error("Interrupted while adding result: {}", value); - } + results.add(value); } protected void updateCommitTime(long commitTime) { @@ -118,7 +115,7 @@ public abstract class WorkItemConsumer<T> implements Runnable { this.countdownLatch = countdownLatch; } - public <V> void setResults(BlockingQueue<Object> queue) { + public <V> void setResults(Queue<Object> queue) { this.results = queue; } } diff --git a/intg/src/main/java/org/apache/atlas/pc/WorkItemManager.java b/intg/src/main/java/org/apache/atlas/pc/WorkItemManager.java index a7ba67c..351421e 100644 --- a/intg/src/main/java/org/apache/atlas/pc/WorkItemManager.java +++ b/intg/src/main/java/org/apache/atlas/pc/WorkItemManager.java @@ -23,6 +23,7 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.List; +import java.util.Queue; import java.util.concurrent.*; public class WorkItemManager<T, U extends WorkItemConsumer> { @@ -33,7 +34,7 @@ public class WorkItemManager<T, U extends WorkItemConsumer> { private final ExecutorService service; private final List<U> consumers = new ArrayList<>(); private CountDownLatch countdownLatch; - private BlockingQueue<Object> resultsQueue; + private Queue<Object> resultsQueue; public WorkItemManager(WorkItemBuilder builder, String namePrefix, int batchSize, int numWorkers, boolean collectResults) { this.numWorkers = numWorkers; @@ -49,13 +50,13 @@ public class WorkItemManager<T, U extends WorkItemConsumer> { this(builder, "workItemConsumer", batchSize, numWorkers, false); } - public void setResultsCollection(BlockingQueue<Object> resultsQueue) { + public void setResultsCollection(Queue<Object> resultsQueue) { this.resultsQueue = resultsQueue; } private void createConsumers(WorkItemBuilder builder, int numWorkers, boolean collectResults) { if (collectResults) { - setResultsCollection(new LinkedBlockingQueue<>()); + setResultsCollection(new ConcurrentLinkedQueue<>()); } for (int i = 0; i < numWorkers; i++) { @@ -124,7 +125,7 @@ public class WorkItemManager<T, U extends WorkItemConsumer> { LOG.info("WorkItemManager: Shutdown done!"); } - public BlockingQueue getResults() { + public Queue getResults() { return this.resultsQueue; }
