This is an automated email from the ASF dual-hosted git repository. sarath pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push: new 572c0b8 ATLAS-3090: PC Framework: Improve Existing Producer Consumer Framework 572c0b8 is described below commit 572c0b80da5a2290b3e1f2614e348b0d1d158990 Author: Ashutosh Mestry <ames...@hortonworks.com> AuthorDate: Mon Apr 8 13:09:33 2019 -0700 ATLAS-3090: PC Framework: Improve Existing Producer Consumer Framework Signed-off-by: Sarath Subramanian <ssubraman...@hortonworks.com> --- .../janus/migration/JsonNodeProcessManager.java | 29 ++-- .../java/org/apache/atlas/pc/WorkItemConsumer.java | 54 ++++++-- .../java/org/apache/atlas/pc/WorkItemManager.java | 71 +++++++++- .../org/apache/atlas/pc/WorkItemConsumerTest.java | 4 +- ...t.java => WorkItemConsumerWithResultsTest.java} | 61 +++++---- .../atlas/pc/WorkItemManagerWithResultsTest.java | 150 +++++++++++++++++++++ 6 files changed, 311 insertions(+), 58 deletions(-) diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/JsonNodeProcessManager.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/JsonNodeProcessManager.java index fb1e684..a0a70ab 100644 --- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/JsonNodeProcessManager.java +++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/migration/JsonNodeProcessManager.java @@ -31,6 +31,7 @@ import java.util.List; import java.util.Map; import java.util.NoSuchElementException; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.atomic.AtomicLong; public class JsonNodeProcessManager { private static class Consumer extends WorkItemConsumer<JsonNode> { @@ -42,9 +43,9 @@ public class JsonNodeProcessManager { protected final Graph bulkLoadGraph; protected final ParseElement parseElement; private final long batchSize; - private long counter; + private AtomicLong counter; private final MappedElementCache cache; - private final List<JsonNode> nodes = new ArrayList<>(); + private static ThreadLocal<List<JsonNode>> nodes = ThreadLocal.withInitial(() -> new ArrayList<>()); public Consumer(BlockingQueue<JsonNode> workQueue, Graph graph, Graph bulkLoadGraph, ParseElement parseElement, long batchSize) { super(workQueue); @@ -53,7 +54,7 @@ public class JsonNodeProcessManager { this.bulkLoadGraph = bulkLoadGraph; this.parseElement = parseElement; this.batchSize = batchSize; - this.counter = 0; + this.counter = new AtomicLong(0); this.cache = new MappedElementCache(); } @@ -63,8 +64,8 @@ public class JsonNodeProcessManager { Map<String, Object> result = parseElement.parse(bulkLoadGraph, cache, node); if (result == null) { - nodes.add(node); - commitConditionally(counter++); + addNode(node); + commitConditionally(counter.getAndIncrement()); } else { commitBulk(); cache.clearAll(); @@ -77,6 +78,10 @@ public class JsonNodeProcessManager { } } + private void addNode(JsonNode node) { + nodes.get().add(node); + } + @Override protected void commitDirty() { super.commitDirty(); @@ -89,18 +94,18 @@ public class JsonNodeProcessManager { } private void commitConditionally(long index) { - if (index % batchSize == 0 && nodes.size() > 0) { + if (index % batchSize == 0 && nodes.get().size() > 0) { commitBulk(); } } private void commitBulk() { - commit(bulkLoadGraph, nodes.size()); - nodes.clear(); + commit(bulkLoadGraph, nodes.get().size()); + nodes.get().clear(); } private void commitRegular() { - commit(graph, nodes.size()); + commit(graph, nodes.get().size()); cache.clearAll(); } @@ -139,15 +144,15 @@ public class JsonNodeProcessManager { } private void retryBatchCommit() { - display("Waiting with [{} nodes] for 1 secs.", nodes.size()); + display("Waiting with [{} nodes] for 1 secs.", nodes.get().size()); try { Thread.sleep(WAIT_DURATION_AFTER_COMMIT_EXCEPTION); - for (JsonNode n : nodes) { + for (JsonNode n : nodes.get()) { parseElement.parse(bulkLoadGraph, cache, n); } commitBulk(); - display("Done!: After re-adding {}.", nodes.size()); + display("Done!: After re-adding {}.", nodes.get().size()); } catch (Exception ex) { error("retryBatchCommit: Failed! Potential data loss.", ex); } diff --git a/intg/src/main/java/org/apache/atlas/pc/WorkItemConsumer.java b/intg/src/main/java/org/apache/atlas/pc/WorkItemConsumer.java index df2cb67..b7eb4d8 100644 --- a/intg/src/main/java/org/apache/atlas/pc/WorkItemConsumer.java +++ b/intg/src/main/java/org/apache/atlas/pc/WorkItemConsumer.java @@ -22,24 +22,31 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; public abstract class WorkItemConsumer<T> implements Runnable { private static final Logger LOG = LoggerFactory.getLogger(WorkItemConsumer.class); private static final int POLLING_DURATION_SECONDS = 5; + private static final int DEFAULT_COMMIT_TIME_IN_MS = 15000; private final BlockingQueue<T> queue; - private boolean isDirty = false; - private long maxCommitTimeInMs = 0; + private AtomicBoolean isDirty = new AtomicBoolean(false); + private AtomicLong maxCommitTimeInMs = new AtomicLong(0); + private CountDownLatch countdownLatch; + private BlockingQueue<Object> results; public WorkItemConsumer(BlockingQueue<T> queue) { this.queue = queue; } public void run() { - while (!Thread.currentThread().isInterrupted()) { - try { + try { + while (!Thread.currentThread().isInterrupted()) { + T item = queue.poll(POLLING_DURATION_SECONDS, TimeUnit.SECONDS); if (item == null) { @@ -47,21 +54,24 @@ public abstract class WorkItemConsumer<T> implements Runnable { return; } - isDirty = true; - + isDirty.set(true); processItem(item); - } catch (InterruptedException e) { - LOG.error("WorkItemConsumer: Interrupted: ", e); } + } catch (InterruptedException e) { + LOG.error("WorkItemConsumer: Interrupted: ", e); + } finally { + maxCommitTimeInMs.set(0); + countdownLatch.countDown(); } } - public long getMaxCommitTimeSeconds() { - return (this.maxCommitTimeInMs > 0 ? this.maxCommitTimeInMs / 1000 : 15); + public long getMaxCommitTimeInMs() { + long commitTime = this.maxCommitTimeInMs.get(); + return ((commitTime > DEFAULT_COMMIT_TIME_IN_MS) ? commitTime : DEFAULT_COMMIT_TIME_IN_MS); } protected void commitDirty() { - if (!isDirty) { + if (!isDirty.get()) { return; } @@ -78,16 +88,32 @@ public abstract class WorkItemConsumer<T> implements Runnable { updateCommitTime((end - start)); - isDirty = false; + isDirty.set(false); } protected abstract void doCommit(); protected abstract void processItem(T item); + protected void addResult(Object value) { + try { + results.put(value); + } catch (InterruptedException e) { + LOG.error("Interrupted while adding result: {}", value); + } + } + protected void updateCommitTime(long commitTime) { - if (this.maxCommitTimeInMs < commitTime) { - this.maxCommitTimeInMs = commitTime; + if (this.maxCommitTimeInMs.get() < commitTime) { + this.maxCommitTimeInMs.set(commitTime); } } + + public void setCountDownLatch(CountDownLatch countdownLatch) { + this.countdownLatch = countdownLatch; + } + + public <V> void setResults(BlockingQueue<Object> queue) { + this.results = queue; + } } diff --git a/intg/src/main/java/org/apache/atlas/pc/WorkItemManager.java b/intg/src/main/java/org/apache/atlas/pc/WorkItemManager.java index 8ac6f11..0e7d3f2 100644 --- a/intg/src/main/java/org/apache/atlas/pc/WorkItemManager.java +++ b/intg/src/main/java/org/apache/atlas/pc/WorkItemManager.java @@ -17,7 +17,7 @@ */ package org.apache.atlas.pc; - +import org.apache.curator.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,19 +28,51 @@ import java.util.concurrent.*; public class WorkItemManager<T, U extends WorkItemConsumer> { private static final Logger LOG = LoggerFactory.getLogger(WorkItemManager.class); + private final int numWorkers; private final BlockingQueue<T> workQueue; private final ExecutorService service; private final List<U> consumers = new ArrayList<>(); + private CountDownLatch countdownLatch; + private BlockingQueue<Object> resultsQueue; - public WorkItemManager(WorkItemBuilder builder, int batchSize, int numWorkers) { + public WorkItemManager(WorkItemBuilder builder, String namePrefix, int batchSize, int numWorkers, boolean collectResults) { + this.numWorkers = numWorkers; workQueue = new LinkedBlockingQueue<>(batchSize * numWorkers); - service = Executors.newFixedThreadPool(numWorkers); + service = Executors.newFixedThreadPool(numWorkers, + new ThreadFactoryBuilder().setNameFormat(namePrefix + "-%d").build()); + + createConsumers(builder, numWorkers, collectResults); + execute(); + } + + public WorkItemManager(WorkItemBuilder builder, int batchSize, int numWorkers) { + this(builder, "workItem", batchSize, numWorkers, false); + } + + public void setResultsCollection(BlockingQueue<Object> resultsQueue) { + this.resultsQueue = resultsQueue; + } + + private void createConsumers(WorkItemBuilder builder, int numWorkers, boolean collectResults) { + if (collectResults) { + setResultsCollection(new LinkedBlockingQueue<>()); + } for (int i = 0; i < numWorkers; i++) { U c = (U) builder.build(workQueue); - - service.submit(c); consumers.add(c); + + if (collectResults) { + c.setResults(resultsQueue); + } + } + } + + private void execute() { + this.countdownLatch = new CountDownLatch(numWorkers); + for (U c : consumers) { + c.setCountDownLatch(countdownLatch); + service.execute(c); } } @@ -52,6 +84,27 @@ public class WorkItemManager<T, U extends WorkItemConsumer> { } } + public void checkProduce(T item) { + if (countdownLatch.getCount() == 0) { + execute(); + } + produce(item); + } + + public void drain() { + try { + if (countdownLatch == null || countdownLatch.getCount() == 0) { + return; + } + + LOG.debug("Drain: Stated! Queue size: {}", workQueue.size()); + this.countdownLatch.await(); + LOG.debug("Drain: Done! Queue size: {}", workQueue.size()); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + } + public void shutdown() throws InterruptedException { int avgCommitTimeSeconds = getAvgCommitTimeSeconds() * 2; @@ -63,13 +116,17 @@ public class WorkItemManager<T, U extends WorkItemConsumer> { LOG.info("WorkItemManager: Shutdown done!"); } + public BlockingQueue getResults() { + return this.resultsQueue; + } + private int getAvgCommitTimeSeconds() { int commitTimeSeconds = 0; for (U c : consumers) { - commitTimeSeconds += c.getMaxCommitTimeSeconds(); + commitTimeSeconds += c.getMaxCommitTimeInMs(); } - return commitTimeSeconds / consumers.size(); + return (commitTimeSeconds / consumers.size()) / 1000; } } diff --git a/intg/src/test/java/org/apache/atlas/pc/WorkItemConsumerTest.java b/intg/src/test/java/org/apache/atlas/pc/WorkItemConsumerTest.java index 6c88b9e..eac8254 100644 --- a/intg/src/test/java/org/apache/atlas/pc/WorkItemConsumerTest.java +++ b/intg/src/test/java/org/apache/atlas/pc/WorkItemConsumerTest.java @@ -21,6 +21,7 @@ package org.apache.atlas.pc; import org.testng.annotations.Test; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; import static org.testng.Assert.assertFalse; @@ -28,12 +29,13 @@ import static org.testng.Assert.assertTrue; public class WorkItemConsumerTest { - private class IntegerConsumerSpy extends WorkItemConsumer<Integer> { + static class IntegerConsumerSpy extends WorkItemConsumer<Integer> { boolean commitDirtyCalled = false; private boolean updateCommitTimeCalled; public IntegerConsumerSpy(BlockingQueue<Integer> queue) { super(queue); + setCountDownLatch(new CountDownLatch(1)); } @Override diff --git a/intg/src/test/java/org/apache/atlas/pc/WorkItemConsumerTest.java b/intg/src/test/java/org/apache/atlas/pc/WorkItemConsumerWithResultsTest.java similarity index 53% copy from intg/src/test/java/org/apache/atlas/pc/WorkItemConsumerTest.java copy to intg/src/test/java/org/apache/atlas/pc/WorkItemConsumerWithResultsTest.java index 6c88b9e..90cb1f7 100644 --- a/intg/src/test/java/org/apache/atlas/pc/WorkItemConsumerTest.java +++ b/intg/src/test/java/org/apache/atlas/pc/WorkItemConsumerWithResultsTest.java @@ -21,16 +21,15 @@ package org.apache.atlas.pc; import org.testng.annotations.Test; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; -import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; -public class WorkItemConsumerTest { - +public class WorkItemConsumerWithResultsTest { private class IntegerConsumerSpy extends WorkItemConsumer<Integer> { - boolean commitDirtyCalled = false; - private boolean updateCommitTimeCalled; + int payload = -1; public IntegerConsumerSpy(BlockingQueue<Integer> queue) { super(queue); @@ -38,58 +37,72 @@ public class WorkItemConsumerTest { @Override protected void doCommit() { - + addResult(payload); } @Override protected void processItem(Integer item) { - + payload = item.intValue(); } @Override protected void commitDirty() { - commitDirtyCalled = true; super.commitDirty(); } + } + + private class IntegerConsumerThrowingError extends WorkItemConsumer<Integer> { + int payload = -1; + + public IntegerConsumerThrowingError(BlockingQueue<Integer> queue) { + super(queue); + } @Override - protected void updateCommitTime(long commitTime) { - updateCommitTimeCalled = true; + protected void doCommit() { + throw new NullPointerException(); } - public boolean isCommitDirtyCalled() { - return commitDirtyCalled; + @Override + protected void processItem(Integer item) { + payload = item.intValue(); } - public boolean isUpdateCommitTimeCalled() { - return updateCommitTimeCalled; + @Override + protected void commitDirty() { + super.commitDirty(); } } - @Test - public void callingRunOnEmptyQueueCallsDoesNotCallCommitDirty() { + public void runningConsumerWillPopulateResults() { + CountDownLatch countDownLatch = new CountDownLatch(1); BlockingQueue<Integer> bc = new LinkedBlockingQueue<>(5); + LinkedBlockingQueue<Object> results = new LinkedBlockingQueue<>(); IntegerConsumerSpy ic = new IntegerConsumerSpy(bc); + ic.setResults(results); + ic.setCountDownLatch(countDownLatch); ic.run(); assertTrue(bc.isEmpty()); - assertTrue(ic.isCommitDirtyCalled()); - assertFalse(ic.isUpdateCommitTimeCalled()); + assertEquals(results.size(), bc.size()); + assertEquals(countDownLatch.getCount(), 0); } - @Test - public void runOnQueueRemovesItemFromQueuCallsCommitDirty() { + public void errorInConsumerWillDecrementCountdownLatch() { + CountDownLatch countDownLatch = new CountDownLatch(1); BlockingQueue<Integer> bc = new LinkedBlockingQueue<>(5); - bc.add(1); + LinkedBlockingQueue<Object> results = new LinkedBlockingQueue<>(); - IntegerConsumerSpy ic = new IntegerConsumerSpy(bc); + IntegerConsumerThrowingError ic = new IntegerConsumerThrowingError(bc); + ic.setCountDownLatch(countDownLatch); + ic.setResults(results); ic.run(); assertTrue(bc.isEmpty()); - assertTrue(ic.isCommitDirtyCalled()); - assertTrue(ic.isUpdateCommitTimeCalled()); + assertTrue(results.isEmpty()); + assertEquals(countDownLatch.getCount(), 0); } } diff --git a/intg/src/test/java/org/apache/atlas/pc/WorkItemManagerWithResultsTest.java b/intg/src/test/java/org/apache/atlas/pc/WorkItemManagerWithResultsTest.java new file mode 100644 index 0000000..a12b597 --- /dev/null +++ b/intg/src/test/java/org/apache/atlas/pc/WorkItemManagerWithResultsTest.java @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.pc; + +import org.apache.commons.lang3.RandomUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.Test; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.BlockingQueue; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; + +public class WorkItemManagerWithResultsTest { + private static final Logger LOG = LoggerFactory.getLogger(WorkItemManagerWithResultsTest.class); + + private static class IntegerConsumer extends WorkItemConsumer<Integer> { + private static ThreadLocal<Integer> payload = new ThreadLocal<Integer>(); + + public IntegerConsumer(BlockingQueue<Integer> queue) { + super(queue); + } + + @Override + protected void doCommit() { + if (getPayload() == -1) { + LOG.debug("Skipping:"); + return; + } + + incrementPayload(100); + addResult(getPayload()); + + setPayload(0); + } + + @Override + protected void processItem(Integer item) { + try { + setPayload(item.intValue()); + Thread.sleep(20 + RandomUtils.nextInt(5, 7)); + super.commit(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + public void setPayload(int v) { + payload.set(v); + } + + public int getPayload() { + return payload.get(); + } + + public void incrementPayload(int v) { + payload.set(payload.get() + v); + } + } + + private class IntegerConsumerBuilder implements WorkItemBuilder<IntegerConsumer, Integer> { + @Override + public IntegerConsumer build(BlockingQueue<Integer> queue) { + return new IntegerConsumer(queue); + } + } + + private WorkItemManager<Integer, WorkItemConsumer> getWorkItemManger(IntegerConsumerBuilder cb, int numWorkers) { + return new WorkItemManager<>(cb, "IntegerConsumer", 5, numWorkers, true); + } + + @Test + public void drainTest() throws InterruptedException { + final int maxItems = 50; + + IntegerConsumerBuilder cb = new IntegerConsumerBuilder(); + WorkItemManager<Integer, WorkItemConsumer> wi = getWorkItemManger(cb, 5); + + for (int i = 0; i < maxItems; i++) { + wi.produce(i); + } + + wi.drain(); + assertEquals(wi.getResults().size(), maxItems); + Set<Integer> set = new HashSet<Integer>(wi.getResults()); + assertEquals(set.size(), maxItems); + + wi.shutdown(); + } + + @Test + public void drainCheckProduceTest() throws InterruptedException { + IntegerConsumerBuilder cb = new IntegerConsumerBuilder(); + WorkItemManager<Integer, WorkItemConsumer> wi = getWorkItemManger(cb, 2); + + for (int i = 0; i < 5; i++) { + repeatedDrainAndProduce(i, wi); + } + + wi.shutdown(); + } + + private void repeatedDrainAndProduce(int runCount, WorkItemManager<Integer, WorkItemConsumer> wi) { + final int maxItems = 100; + int halfWay = maxItems / 2; + + LOG.info("Run: {}", runCount); + wi.getResults().clear(); + + for (int i = 0; i < maxItems; i++) { + if (i == halfWay) { + wi.drain(); + + Set<Integer> set = new HashSet<Integer>(wi.getResults()); + assertEquals(wi.getResults().size(), halfWay, "halfWay: total count"); + assertEquals(set.size(), halfWay, "halfWay: set match"); + } + + wi.checkProduce(i); + } + + wi.drain(); + assertEquals(wi.getResults().size(), maxItems, "total count"); + Set<Integer> set = new HashSet<Integer>(wi.getResults()); + assertEquals(set.size(), maxItems, "set count"); + + for (int i = 100; i < 100 + maxItems; i++) { + assertTrue(set.contains(i), "Could not test: " + i); + } + } +}