Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2646#discussion_r212353990
--- Diff:
processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java
---
@@ -45,17 +45,21 @@
private RowBatch readBatch;
+ private static Object lockObject = new Object();
+
private ArrayBlockingQueue<RowBatch> queue = new
ArrayBlockingQueue<>(10);
public void write(Object[] row) throws InterruptedException {
if (close) {
// already might be closed forcefully
return;
}
- if (!loadBatch.addRow(row)) {
- loadBatch.readyRead();
- queue.put(loadBatch);
- loadBatch = new RowBatch(batchSize);
+ synchronized (lockObject) {
--- End diff --
I feel that the loadBatch should be the parameter of this function, so the
caller can accumulate the rows in a batch in its own thread, then call this
write function to add to the shared queue.
---