Repository: carbondata Updated Branches: refs/heads/master 500654e60 -> 06b0d0816
[CARBONDATA-PROCESSING]Fix findbugs issues in carbondata-processing module Fix findbugs issues in carbondata-processing module This closes #1268 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/06b0d081 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/06b0d081 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/06b0d081 Branch: refs/heads/master Commit: 06b0d081691c6d17d099557dbbe1ee766e9de0d4 Parents: 500654e Author: Raghunandan S <[email protected]> Authored: Fri Aug 18 18:14:38 2017 +0530 Committer: Jacky Li <[email protected]> Committed: Sat Aug 19 09:57:23 2017 +0800 ---------------------------------------------------------------------- .../processing/datatypes/PrimitiveDataType.java | 6 -- .../processing/merger/CarbonDataMergerUtil.java | 30 ++---- .../merger/CarbonDataMergerUtilResult.java | 7 ++ .../impl/MeasureFieldConverterImpl.java | 2 + .../processing/newflow/row/CarbonRowBatch.java | 11 +- .../newflow/sort/SortStepRowUtil.java | 7 +- .../sort/impl/ParallelReadMergeSorterImpl.java | 9 +- ...arallelReadMergeSorterWithBucketingImpl.java | 10 +- .../UnsafeBatchParallelReadMergeSorterImpl.java | 15 +-- .../impl/UnsafeParallelReadMergeSorterImpl.java | 10 +- ...arallelReadMergeSorterWithBucketingImpl.java | 24 +++-- .../sort/unsafe/UnsafeCarbonRowPage.java | 9 ++ .../newflow/sort/unsafe/UnsafeSortDataRows.java | 17 +-- .../holder/UnsafeFinalMergePageHolder.java | 17 +++ .../unsafe/holder/UnsafeInmemoryHolder.java | 18 ++++ .../holder/UnsafeInmemoryMergeHolder.java | 17 +++ .../holder/UnsafeSortTempFileChunkHolder.java | 11 +- .../UnsafeInMemoryIntermediateDataMerger.java | 7 +- .../merger/UnsafeIntermediateFileMerger.java | 8 +- .../unsafe/merger/UnsafeIntermediateMerger.java | 4 +- .../newflow/steps/DummyClassForTest.java | 103 ------------------- .../sortdata/IntermediateFileMerger.java | 10 +- .../sortandgroupby/sortdata/SortDataRows.java | 20 ++-- .../sortdata/SortIntermediateFileMerger.java | 2 +- .../sortdata/SortTempFileChunkHolder.java | 10 +- .../store/CarbonFactDataHandlerColumnar.java | 6 +- .../store/SingleThreadFinalSortFilesMerger.java | 32 +++--- 27 files changed, 194 insertions(+), 228 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/06b0d081/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java index e7e48e9..729f9e3 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java +++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java @@ -71,11 +71,6 @@ public class PrimitiveDataType implements GenericDataType<Object> { private String columnId; /** - * dimension ordinal of primitive type column - */ - private int dimensionOrdinal; - - /** * key size */ private int keySize; @@ -105,7 +100,6 @@ public class PrimitiveDataType implements GenericDataType<Object> { this.name = name; this.parentname = parentname; this.columnId = columnId; - this.dimensionOrdinal = dimensionOrdinal; } /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/06b0d081/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java index ce9c433..86e9eff 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java @@ -169,8 +169,7 @@ public final class CarbonDataMergerUtil { String timestamp = "" + carbonLoadModel.getFactTimeStamp(); - List<String> updatedDeltaFilesList = - new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); + List<String> updatedDeltaFilesList = null; // This routine updateLoadMetadataIUDCompactionMergeStatus is suppose to update // two files as it is only called during IUD_UPDDEL_DELTA_COMPACTION. Along with @@ -259,23 +258,16 @@ public final class CarbonDataMergerUtil { } } - try { - segmentUpdateStatusManager - .writeLoadDetailsIntoFile(Arrays.asList(updateLists), timestamp); - segmentStatusManager - .writeLoadDetailsIntoFile(carbonTablePath.getTableStatusFilePath(), loadDetails); - status = true; - } catch (IOException e) { - LOGGER.error( - "Error while writing metadata. The metadata file path is " + carbonTablePath - .getMetadataDirectoryPath()); - status = false; - } + segmentUpdateStatusManager + .writeLoadDetailsIntoFile(Arrays.asList(updateLists), timestamp); + segmentStatusManager + .writeLoadDetailsIntoFile(carbonTablePath.getTableStatusFilePath(), loadDetails); + status = true; } else { LOGGER.error("Not able to acquire the lock."); status = false; } - } catch (Exception e) { + } catch (IOException e) { LOGGER.error("Error while updating metadata. The metadata file path is " + carbonTablePath .getMetadataDirectoryPath()); status = false; @@ -456,13 +448,7 @@ public final class CarbonDataMergerUtil { @Override public int compare(LoadMetadataDetails seg1, LoadMetadataDetails seg2) { double seg1Id = Double.parseDouble(seg1.getLoadName()); double seg2Id = Double.parseDouble(seg2.getLoadName()); - if (seg1Id - seg2Id < 0) { - return -1; - } - if (seg1Id - seg2Id > 0) { - return 1; - } - return 0; + return Double.compare(seg1Id, seg2Id); } }); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/06b0d081/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtilResult.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtilResult.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtilResult.java index aa3d801..cf1e22d 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtilResult.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtilResult.java @@ -30,4 +30,11 @@ public final class CarbonDataMergerUtilResult extends SegmentUpdateDetails { compactionStatus = status; } + @Override public boolean equals(Object obj) { + return super.equals(obj); + } + + @Override public int hashCode() { + return super.hashCode(); + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/06b0d081/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/MeasureFieldConverterImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/MeasureFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/MeasureFieldConverterImpl.java index 58fa88e..8e20b8f 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/MeasureFieldConverterImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/MeasureFieldConverterImpl.java @@ -65,6 +65,8 @@ public class MeasureFieldConverterImpl implements FieldConverter { if (value == null || isNull) { String message = logHolder.getColumnMessageMap().get(measure.getColName()); if (null == message) { + message = CarbonDataProcessorUtil + .prepareFailureReason(measure.getColName(), measure.getDataType()); logHolder.getColumnMessageMap().put(measure.getColName(), message); } row.update(null, index); http://git-wip-us.apache.org/repos/asf/carbondata/blob/06b0d081/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRowBatch.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRowBatch.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRowBatch.java index e8eb071..1de55e0 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRowBatch.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRowBatch.java @@ -17,9 +17,12 @@ package org.apache.carbondata.processing.newflow.row; +import java.util.NoSuchElementException; + import org.apache.carbondata.common.CarbonIterator; import org.apache.carbondata.core.datastore.row.CarbonRow; + /** * Batch of rows. */ @@ -47,8 +50,12 @@ public class CarbonRowBatch extends CarbonIterator<CarbonRow> { return index < size; } - @Override public CarbonRow next() { - return rowBatch[index++]; + @Override + public CarbonRow next() throws NoSuchElementException { + if (hasNext()) { + return rowBatch[index++]; + } + throw new NoSuchElementException("no more elements to iterate"); } @Override public void remove() { http://git-wip-us.apache.org/repos/asf/carbondata/blob/06b0d081/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortStepRowUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortStepRowUtil.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortStepRowUtil.java index 7857f4e..5238c3c 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortStepRowUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortStepRowUtil.java @@ -58,12 +58,7 @@ public class SortStepRowUtil { // read measure values for (int i = 0; i < measureCount; i++) { - if (needConvertDecimalToByte) { - measures[index++] = data[allCount]; - } else { - measures[index++] = data[allCount]; - } - + measures[index++] = data[allCount]; allCount++; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/06b0d081/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java index c3cf3c0..5a8a2c8 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java @@ -18,7 +18,6 @@ package org.apache.carbondata.processing.newflow.sort.impl; import java.io.File; import java.util.Iterator; -import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -100,7 +99,7 @@ public class ParallelReadMergeSorterImpl extends AbstractMergeSorter { try { for (int i = 0; i < iterators.length; i++) { - executorService.submit( + executorService.execute( new SortIteratorThread(iterators[i], sortDataRow, batchSize, rowCounter, threadStatusObserver)); } @@ -183,7 +182,7 @@ public class ParallelReadMergeSorterImpl extends AbstractMergeSorter { /** * This thread iterates the iterator and adds the rows to @{@link SortDataRows} */ - private static class SortIteratorThread implements Callable<Void> { + private static class SortIteratorThread implements Runnable { private Iterator<CarbonRowBatch> iterator; @@ -206,7 +205,7 @@ public class ParallelReadMergeSorterImpl extends AbstractMergeSorter { } @Override - public Void call() throws CarbonDataLoadingException { + public void run() { try { while (iterator.hasNext()) { CarbonRowBatch batch = iterator.next(); @@ -225,9 +224,7 @@ public class ParallelReadMergeSorterImpl extends AbstractMergeSorter { } catch (Exception e) { LOGGER.error(e); observer.notifyFailed(e); - throw new CarbonDataLoadingException(e); } - return null; } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/06b0d081/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java index 851c384..7e013e0 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java @@ -18,7 +18,6 @@ package org.apache.carbondata.processing.newflow.sort.impl; import java.io.File; import java.util.Iterator; -import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -104,7 +103,7 @@ public class ParallelReadMergeSorterWithBucketingImpl extends AbstractMergeSorte final int batchSize = CarbonProperties.getInstance().getBatchSize(); try { for (int i = 0; i < iterators.length; i++) { - executorService.submit(new SortIteratorThread(iterators[i], sortDataRows, rowCounter, + executorService.execute(new SortIteratorThread(iterators[i], sortDataRows, rowCounter, this.threadStatusObserver)); } executorService.shutdown(); @@ -197,7 +196,7 @@ public class ParallelReadMergeSorterWithBucketingImpl extends AbstractMergeSorte /** * This thread iterates the iterator and adds the rows to @{@link SortDataRows} */ - private static class SortIteratorThread implements Callable<Void> { + private static class SortIteratorThread implements Runnable { private Iterator<CarbonRowBatch> iterator; @@ -215,7 +214,8 @@ public class ParallelReadMergeSorterWithBucketingImpl extends AbstractMergeSorte this.threadStatusObserver = observer; } - @Override public Void call() throws CarbonDataLoadingException { + @Override + public void run() { try { while (iterator.hasNext()) { CarbonRowBatch batch = iterator.next(); @@ -234,9 +234,7 @@ public class ParallelReadMergeSorterWithBucketingImpl extends AbstractMergeSorte } catch (Exception e) { LOGGER.error(e); this.threadStatusObserver.notifyFailed(e); - throw new CarbonDataLoadingException(e); } - return null; } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/06b0d081/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java index ebb85b4..056c96b 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java @@ -20,7 +20,6 @@ import java.io.File; import java.util.Iterator; import java.util.List; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; @@ -83,7 +82,7 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter try { for (int i = 0; i < iterators.length; i++) { - executorService.submit( + executorService.execute( new SortIteratorThread(iterators[i], sortBatchHolder, batchSize, rowCounter, this.threadStatusObserver)); } @@ -118,7 +117,7 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter /** * This thread iterates the iterator and adds the rows */ - private static class SortIteratorThread implements Callable<Void> { + private static class SortIteratorThread implements Runnable { private Iterator<CarbonRowBatch> iterator; @@ -139,7 +138,8 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter this.threadStatusObserver = threadStatusObserver; } - @Override public Void call() throws CarbonDataLoadingException { + @Override + public void run() { try { while (iterator.hasNext()) { CarbonRowBatch batch = iterator.next(); @@ -164,11 +164,9 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter } catch (Exception e) { LOGGER.error(e); this.threadStatusObserver.notifyFailed(e); - throw new CarbonDataLoadingException(e); } finally { sortDataRows.finishThread(); } - return null; } } @@ -283,7 +281,10 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter // thread from waiting. if (finalMerger != null) { finalMerger.setStopProcess(true); - mergerQueue.offer(finalMerger); + boolean offered = mergerQueue.offer(finalMerger); + if (!offered) { + throw new CarbonDataLoadingException(e); + } } throw new CarbonDataLoadingException(e); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/06b0d081/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterImpl.java index ad4ebfc..a0d43ba 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterImpl.java @@ -18,7 +18,6 @@ package org.apache.carbondata.processing.newflow.sort.impl; import java.util.Iterator; import java.util.List; -import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -88,7 +87,7 @@ public class UnsafeParallelReadMergeSorterImpl extends AbstractMergeSorter { try { for (int i = 0; i < iterators.length; i++) { - executorService.submit( + executorService.execute( new SortIteratorThread(iterators[i], sortDataRow, batchSize, rowCounter, this.threadStatusObserver)); } @@ -168,7 +167,7 @@ public class UnsafeParallelReadMergeSorterImpl extends AbstractMergeSorter { /** * This thread iterates the iterator and adds the rows */ - private static class SortIteratorThread implements Callable<Void> { + private static class SortIteratorThread implements Runnable { private Iterator<CarbonRowBatch> iterator; @@ -190,7 +189,8 @@ public class UnsafeParallelReadMergeSorterImpl extends AbstractMergeSorter { this.threadStatusObserver = threadStatusObserver; } - @Override public Void call() throws CarbonDataLoadingException { + @Override + public void run() { try { while (iterator.hasNext()) { CarbonRowBatch batch = iterator.next(); @@ -209,9 +209,7 @@ public class UnsafeParallelReadMergeSorterImpl extends AbstractMergeSorter { } catch (Exception e) { LOGGER.error(e); this.threadStatusObserver.notifyFailed(e); - throw new CarbonDataLoadingException(e); } - return null; } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/06b0d081/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java index f000619..54e0180 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java @@ -20,7 +20,6 @@ package org.apache.carbondata.processing.newflow.sort.impl; import java.io.File; import java.util.Iterator; import java.util.List; -import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -37,7 +36,7 @@ import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory; import org.apache.carbondata.processing.newflow.DataField; import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException; import org.apache.carbondata.processing.newflow.row.CarbonRowBatch; -import org.apache.carbondata.processing.newflow.sort.Sorter; +import org.apache.carbondata.processing.newflow.sort.AbstractMergeSorter; import org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeCarbonRowPage; import org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeSortDataRows; import org.apache.carbondata.processing.newflow.sort.unsafe.merger.UnsafeIntermediateMerger; @@ -52,7 +51,7 @@ import org.apache.carbondata.processing.util.CarbonDataProcessorUtil; * This step is specifically for bucketing, it sorts each bucket data separately and write to * temp files. */ -public class UnsafeParallelReadMergeSorterWithBucketingImpl implements Sorter { +public class UnsafeParallelReadMergeSorterWithBucketingImpl extends AbstractMergeSorter { private static final LogService LOGGER = LogServiceFactory.getLogService( @@ -95,17 +94,21 @@ public class UnsafeParallelReadMergeSorterWithBucketingImpl implements Sorter { throw new CarbonDataLoadingException(e); } ExecutorService executorService = Executors.newFixedThreadPool(iterators.length); + this.threadStatusObserver = new ThreadStatusObserver(executorService); final int batchSize = CarbonProperties.getInstance().getBatchSize(); try { for (int i = 0; i < iterators.length; i++) { - executorService.submit(new SortIteratorThread(iterators[i], sortDataRows)); + executorService.execute(new SortIteratorThread(iterators[i], sortDataRows, this + .threadStatusObserver)); } executorService.shutdown(); executorService.awaitTermination(2, TimeUnit.DAYS); processRowToNextStep(sortDataRows, sortParameters); } catch (Exception e) { + checkError(); throw new CarbonDataLoadingException("Problem while shutdown the server ", e); } + checkError(); try { for (int i = 0; i < intermediateFileMergers.length; i++) { intermediateFileMergers[i].finish(); @@ -180,19 +183,23 @@ public class UnsafeParallelReadMergeSorterWithBucketingImpl implements Sorter { /** * This thread iterates the iterator and adds the rows to @{@link UnsafeSortDataRows} */ - private static class SortIteratorThread implements Callable<Void> { + private static class SortIteratorThread implements Runnable { private Iterator<CarbonRowBatch> iterator; private UnsafeSortDataRows[] sortDataRows; + private ThreadStatusObserver threadStatusObserver; + public SortIteratorThread(Iterator<CarbonRowBatch> iterator, - UnsafeSortDataRows[] sortDataRows) { + UnsafeSortDataRows[] sortDataRows, ThreadStatusObserver threadStatusObserver) { this.iterator = iterator; this.sortDataRows = sortDataRows; + this.threadStatusObserver = threadStatusObserver; } - @Override public Void call() throws CarbonDataLoadingException { + @Override + public void run() { try { while (iterator.hasNext()) { CarbonRowBatch batch = iterator.next(); @@ -209,9 +216,8 @@ public class UnsafeParallelReadMergeSorterWithBucketingImpl implements Sorter { } } catch (Exception e) { LOGGER.error(e); - throw new CarbonDataLoadingException(e); + this.threadStatusObserver.notifyFailed(e); } - return null; } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/06b0d081/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java index 20b60c9..8b23437 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java @@ -157,6 +157,9 @@ public class UnsafeCarbonRowPage { address + size, bigDecimalInBytes.length); size += bigDecimalInBytes.length; break; + default: + throw new IllegalArgumentException("unsupported data type:" + + measureDataType[mesCount]); } set(nullSetWords, mesCount); } else { @@ -240,6 +243,9 @@ public class UnsafeCarbonRowPage { size += bigDecimalInBytes.length; rowToFill[dimensionSize + mesCount] = DataTypeUtil.byteToBigDecimal(bigDecimalInBytes); break; + default: + throw new IllegalArgumentException("unsupported data type:" + + measureDataType[mesCount]); } } else { rowToFill[dimensionSize + mesCount] = null; @@ -326,6 +332,9 @@ public class UnsafeCarbonRowPage { stream.writeShort(aShort); stream.write(bigDecimalInBytes); break; + default: + throw new IllegalArgumentException("unsupported data type:" + + measureDataType[mesCount]); } } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/06b0d081/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java index d0bb4f6..dda0d89 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java @@ -23,7 +23,6 @@ import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.util.Random; -import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; @@ -102,7 +101,7 @@ public class UnsafeSortDataRows { this.threadStatusObserver = new ThreadStatusObserver(); this.taskId = ThreadLocalTaskInfo.getCarbonTaskInfo().getTaskId(); this.inMemoryChunkSize = inMemoryChunkSize; - this.inMemoryChunkSize = inMemoryChunkSize * 1024 * 1024; + this.inMemoryChunkSize = inMemoryChunkSize * 1024L * 1024L; enableInMemoryIntermediateMerge = Boolean.parseBoolean(CarbonProperties.getInstance() .getProperty(CarbonCommonConstants.ENABLE_INMEMORY_MERGE_SORT, CarbonCommonConstants.ENABLE_INMEMORY_MERGE_SORT_DEFAULT)); @@ -186,7 +185,7 @@ public class UnsafeSortDataRows { } unsafeInMemoryIntermediateFileMerger.startFileMergingIfPossible(); semaphore.acquire(); - dataSorterAndWriterExecutorService.submit(new DataSorterAndWriter(rowPage)); + dataSorterAndWriterExecutorService.execute(new DataSorterAndWriter(rowPage)); MemoryBlock memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(this.taskId, inMemoryChunkSize); boolean saveToDisk = @@ -342,14 +341,15 @@ public class UnsafeSortDataRows { * This class is responsible for sorting and writing the object * array which holds the records equal to given array size */ - private class DataSorterAndWriter implements Callable<Void> { + private class DataSorterAndWriter implements Runnable { private UnsafeCarbonRowPage page; public DataSorterAndWriter(UnsafeCarbonRowPage rowPage) { this.page = rowPage; } - @Override public Void call() throws Exception { + @Override + public void run() { try { long startTime = System.currentTimeMillis(); TimSort<UnsafeCarbonRow, IntPointerBuffer> timSort = new TimSort<>( @@ -399,11 +399,14 @@ public class UnsafeSortDataRows { + (System.currentTimeMillis() - startTime)); } } catch (Throwable e) { - threadStatusObserver.notifyFailed(e); + try { + threadStatusObserver.notifyFailed(e); + } catch (CarbonSortKeyAndGroupByException ex) { + LOGGER.error(e); + } } finally { semaphore.release(); } - return null; } } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/06b0d081/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeFinalMergePageHolder.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeFinalMergePageHolder.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeFinalMergePageHolder.java index 397de63..f00dd45 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeFinalMergePageHolder.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeFinalMergePageHolder.java @@ -76,6 +76,23 @@ public class UnsafeFinalMergePageHolder implements SortTempChunkHolder { return comparator.compare(currentRow, o.getRow()); } + @Override public boolean equals(Object obj) { + if (this == obj) { + return true; + } + + if (!(obj instanceof UnsafeFinalMergePageHolder)) { + return false; + } + + UnsafeFinalMergePageHolder o = (UnsafeFinalMergePageHolder) obj; + return this == o; + } + + @Override public int hashCode() { + return super.hashCode(); + } + public int numberOfRows() { return actualSize; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/06b0d081/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeInmemoryHolder.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeInmemoryHolder.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeInmemoryHolder.java index 048f4f8..20d9894 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeInmemoryHolder.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeInmemoryHolder.java @@ -72,6 +72,24 @@ public class UnsafeInmemoryHolder implements SortTempChunkHolder { return comparator.compare(currentRow, o.getRow()); } + @Override public boolean equals(Object obj) { + if (this == obj) { + return true; + } + + if (!(obj instanceof UnsafeInmemoryHolder)) { + return false; + } + + UnsafeInmemoryHolder o = (UnsafeInmemoryHolder)obj; + + return this == o; + } + + @Override public int hashCode() { + return super.hashCode(); + } + public int numberOfRows() { return actualSize; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/06b0d081/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeInmemoryMergeHolder.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeInmemoryMergeHolder.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeInmemoryMergeHolder.java index 390dbf5..fa4534f 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeInmemoryMergeHolder.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeInmemoryMergeHolder.java @@ -78,6 +78,23 @@ public class UnsafeInmemoryMergeHolder implements Comparable<UnsafeInmemoryMerge return comparator.compare(currentRow, baseObject, o.getRow(), o.getBaseObject()); } + @Override public boolean equals(Object obj) { + if (this == obj) { + return true; + } + + if (!(obj instanceof UnsafeInmemoryMergeHolder)) { + return false; + } + + UnsafeInmemoryMergeHolder o = (UnsafeInmemoryMergeHolder)obj; + return this == o; + } + + @Override public int hashCode() { + return super.hashCode(); + } + public Object getBaseObject() { return baseObject; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/06b0d081/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java index 7fb9b6e..f5316e6 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java @@ -344,11 +344,14 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder { stream.readFully(bigDecimalInBytes); row[dimensionCount + mesCount] = DataTypeUtil.byteToBigDecimal(bigDecimalInBytes); break; + default: + throw new IllegalArgumentException("unsupported data type:" + + measureDataType[mesCount]); } } } return row; - } catch (Exception e) { + } catch (IOException e) { throw new CarbonSortKeyAndGroupByException(e); } } @@ -397,12 +400,16 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder { } @Override public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (!(obj instanceof UnsafeSortTempFileChunkHolder)) { return false; } UnsafeSortTempFileChunkHolder o = (UnsafeSortTempFileChunkHolder) obj; - return o.compareTo(o) == 0; + return this == o; } @Override public int hashCode() { http://git-wip-us.apache.org/repos/asf/carbondata/blob/06b0d081/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeInMemoryIntermediateDataMerger.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeInMemoryIntermediateDataMerger.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeInMemoryIntermediateDataMerger.java index 6adbac8..5480838 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeInMemoryIntermediateDataMerger.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeInMemoryIntermediateDataMerger.java @@ -19,7 +19,6 @@ package org.apache.carbondata.processing.newflow.sort.unsafe.merger; import java.util.AbstractQueue; import java.util.PriorityQueue; -import java.util.concurrent.Callable; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; @@ -28,7 +27,7 @@ import org.apache.carbondata.processing.newflow.sort.unsafe.holder.UnsafeCarbonR import org.apache.carbondata.processing.newflow.sort.unsafe.holder.UnsafeInmemoryMergeHolder; import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException; -public class UnsafeInMemoryIntermediateDataMerger implements Callable<Void> { +public class UnsafeInMemoryIntermediateDataMerger implements Runnable { /** * LOGGER */ @@ -68,7 +67,8 @@ public class UnsafeInMemoryIntermediateDataMerger implements Callable<Void> { this.entryCount = 0; } - @Override public Void call() throws Exception { + @Override + public void run() { long intermediateMergeStartTime = System.currentTimeMillis(); int holderCounterConst = holderCounter; try { @@ -83,7 +83,6 @@ public class UnsafeInMemoryIntermediateDataMerger implements Callable<Void> { } catch (Exception e) { LOGGER.error(e, "Problem while intermediate merging"); } - return null; } /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/06b0d081/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateFileMerger.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateFileMerger.java index c67e093..63f6aab 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateFileMerger.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateFileMerger.java @@ -27,7 +27,6 @@ import java.nio.ByteBuffer; import java.util.AbstractQueue; import java.util.Arrays; import java.util.PriorityQueue; -import java.util.concurrent.Callable; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; @@ -41,7 +40,7 @@ import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters; import org.apache.carbondata.processing.sortandgroupby.sortdata.TempSortFileWriter; import org.apache.carbondata.processing.sortandgroupby.sortdata.TempSortFileWriterFactory; -public class UnsafeIntermediateFileMerger implements Callable<Void> { +public class UnsafeIntermediateFileMerger implements Runnable { /** * LOGGER */ @@ -100,7 +99,8 @@ public class UnsafeIntermediateFileMerger implements Callable<Void> { rowData = ByteBuffer.allocate(2 * 1024 * 1024); } - @Override public Void call() throws Exception { + @Override + public void run() { long intermediateMergeStartTime = System.currentTimeMillis(); int fileConterConst = fileCounter; boolean isFailed = false; @@ -134,8 +134,6 @@ public class UnsafeIntermediateFileMerger implements Callable<Void> { } } } - - return null; } /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/06b0d081/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateMerger.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateMerger.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateMerger.java index 93698ec..49791e8 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateMerger.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateMerger.java @@ -116,7 +116,7 @@ public class UnsafeIntermediateMerger { .nanoTime() + CarbonCommonConstants.MERGERD_EXTENSION); UnsafeIntermediateFileMerger merger = new UnsafeIntermediateFileMerger(parameters, intermediateFiles, file); - executorService.submit(merger); + executorService.execute(merger); } public void startInmemoryMergingIfPossible() throws CarbonSortKeyAndGroupByException { @@ -149,7 +149,7 @@ public class UnsafeIntermediateMerger { UnsafeInMemoryIntermediateDataMerger merger = new UnsafeInMemoryIntermediateDataMerger(rowPages, totalRows); mergedPages.add(merger); - executorService.submit(merger); + executorService.execute(merger); } private int getTotalNumberOfRows(List<UnsafeCarbonRowPage> unsafeCarbonRowPages) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/06b0d081/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DummyClassForTest.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DummyClassForTest.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DummyClassForTest.java deleted file mode 100644 index e1c74a8..0000000 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DummyClassForTest.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.carbondata.processing.newflow.steps; - -import java.util.Iterator; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; - -import org.apache.carbondata.core.datastore.row.CarbonRow; -import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep; -import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration; -import org.apache.carbondata.processing.newflow.DataField; -import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException; -import org.apache.carbondata.processing.newflow.row.CarbonRowBatch; - -/** - * DummyClassForTest - */ -public class DummyClassForTest extends AbstractDataLoadProcessorStep { - - private ExecutorService executorService; - - public DummyClassForTest(CarbonDataLoadConfiguration configuration, - AbstractDataLoadProcessorStep child) { - super(configuration, child); - } - - @Override public DataField[] getOutput() { - return child.getOutput(); - } - - @Override public void initialize() throws CarbonDataLoadingException { - - } - - @Override protected String getStepName() { - return "Dummy"; - } - - @Override public Iterator<CarbonRowBatch>[] execute() throws CarbonDataLoadingException { - Iterator<CarbonRowBatch>[] iterators = child.execute(); - this.executorService = Executors.newFixedThreadPool(iterators.length); - - try { - for (int i = 0; i < iterators.length; i++) { - executorService.submit(new DummyThread(iterators[i])); - } - executorService.shutdown(); - executorService.awaitTermination(2, TimeUnit.DAYS); - } catch (Exception e) { - throw new CarbonDataLoadingException("Problem while shutdown the server ", e); - } - return null; - } - - @Override protected CarbonRow processRow(CarbonRow row) { - return null; - } -} - -/** - * This thread iterates the iterator - */ -class DummyThread implements Callable<Void> { - - private Iterator<CarbonRowBatch> iterator; - - public DummyThread(Iterator<CarbonRowBatch> iterator) { - this.iterator = iterator; - } - - @Override public Void call() throws CarbonDataLoadingException { - try { - while (iterator.hasNext()) { - CarbonRowBatch batch = iterator.next(); - while (batch.hasNext()) { - CarbonRow row = batch.next(); - // do nothing - } - } - - } catch (Exception e) { - throw new CarbonDataLoadingException(e); - } - return null; - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/06b0d081/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java index 653da7b..7c6a889 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java +++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java @@ -25,7 +25,6 @@ import java.io.FileOutputStream; import java.io.IOException; import java.util.AbstractQueue; import java.util.PriorityQueue; -import java.util.concurrent.Callable; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; @@ -34,7 +33,7 @@ import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.core.util.NonDictionaryUtil; import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException; -public class IntermediateFileMerger implements Callable<Void> { +public class IntermediateFileMerger implements Runnable { /** * LOGGER */ @@ -101,7 +100,8 @@ public class IntermediateFileMerger implements Callable<Void> { noDictionarycolumnMapping = mergerParameters.getNoDictionaryDimnesionColumn(); } - @Override public Void call() throws Exception { + @Override + public void run() { long intermediateMergeStartTime = System.currentTimeMillis(); int fileConterConst = fileCounter; boolean isFailed = false; @@ -148,8 +148,6 @@ public class IntermediateFileMerger implements Callable<Void> { } } } - - return null; } /** @@ -358,6 +356,8 @@ public class IntermediateFileMerger implements Callable<Void> { stream.writeInt(bigDecimalInBytes.length); stream.write(bigDecimalInBytes); break; + default: + throw new IllegalArgumentException("unsupported data type:" + aggType[counter]); } } else { stream.write((byte) 0); http://git-wip-us.apache.org/repos/asf/carbondata/blob/06b0d081/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java index fc575b6..71fc564 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java +++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java @@ -25,7 +25,6 @@ import java.io.IOException; import java.math.BigDecimal; import java.util.Arrays; import java.util.Random; -import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; @@ -127,7 +126,7 @@ public class SortDataRows { Object[][] recordHolderListLocal = recordHolderList; try { semaphore.acquire(); - dataSorterAndWriterExecutorService.submit(new DataSorterAndWriter(recordHolderListLocal)); + dataSorterAndWriterExecutorService.execute(new DataSorterAndWriter(recordHolderListLocal)); } catch (InterruptedException e) { LOGGER.error( "exception occurred while trying to acquire a semaphore lock: " + e.getMessage()); @@ -163,7 +162,8 @@ public class SortDataRows { } try { semaphore.acquire(); - dataSorterAndWriterExecutorService.submit(new DataSorterAndWriter(recordHolderListLocal)); + dataSorterAndWriterExecutorService + .execute(new DataSorterAndWriter(recordHolderListLocal)); } catch (Exception e) { LOGGER.error( "exception occurred while trying to acquire a semaphore lock: " + e.getMessage()); @@ -310,6 +310,8 @@ public class SortDataRows { stream.writeInt(bigDecimalInBytes.length); stream.write(bigDecimalInBytes); break; + default: + throw new IllegalArgumentException("unsupported data type:" + type[mesCount]); } } else { stream.write((byte) 0); @@ -389,14 +391,15 @@ public class SortDataRows { * This class is responsible for sorting and writing the object * array which holds the records equal to given array size */ - private class DataSorterAndWriter implements Callable<Void> { + private class DataSorterAndWriter implements Runnable { private Object[][] recordHolderArray; public DataSorterAndWriter(Object[][] recordHolderArray) { this.recordHolderArray = recordHolderArray; } - @Override public Void call() throws Exception { + @Override + public void run() { try { long startTime = System.currentTimeMillis(); if (parameters.getNumberOfNoDictSortColumns() > 0) { @@ -420,11 +423,14 @@ public class SortDataRows { LOGGER.info("Time taken to sort and write sort temp file " + sortTempFile + " is: " + ( System.currentTimeMillis() - startTime)); } catch (Throwable e) { - threadStatusObserver.notifyFailed(e); + try { + threadStatusObserver.notifyFailed(e); + } catch (CarbonSortKeyAndGroupByException ex) { + LOGGER.error(ex); + } } finally { semaphore.release(); } - return null; } } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/06b0d081/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortIntermediateFileMerger.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortIntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortIntermediateFileMerger.java index 383498c..6bda88a 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortIntermediateFileMerger.java +++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortIntermediateFileMerger.java @@ -91,7 +91,7 @@ public class SortIntermediateFileMerger { chosenTempDir + File.separator + parameters.getTableName() + System .nanoTime() + CarbonCommonConstants.MERGERD_EXTENSION); IntermediateFileMerger merger = new IntermediateFileMerger(parameters, intermediateFiles, file); - executorService.submit(merger); + executorService.execute(merger); } public void finish() throws CarbonSortKeyAndGroupByException { http://git-wip-us.apache.org/repos/asf/carbondata/blob/06b0d081/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java index 9732e66..10b3ad5 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java +++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java @@ -359,6 +359,8 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold stream.readFully(buff); measures[index++] = DataTypeUtil.byteToBigDecimal(buff); break; + default: + throw new IllegalArgumentException("unsupported data type:" + aggType[i]); } } else { measures[index++] = null; @@ -448,14 +450,16 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold } @Override public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (!(obj instanceof SortTempFileChunkHolder)) { return false; } SortTempFileChunkHolder o = (SortTempFileChunkHolder) obj; - - - return o.compareTo(o) == 0; + return this == o; } @Override public int hashCode() { http://git-wip-us.apache.org/repos/asf/carbondata/blob/06b0d081/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java index a716340..251b62e 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java @@ -190,14 +190,14 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { } } this.version = CarbonProperties.getInstance().getFormatVersion(); - String noInvertedIdxCol = ""; + StringBuffer noInvertedIdxCol = new StringBuffer(); for (CarbonDimension cd : model.getSegmentProperties().getDimensions()) { if (!cd.isUseInvertedIndex()) { - noInvertedIdxCol += (cd.getColName() + ","); + noInvertedIdxCol.append(cd.getColName()).append(","); } } - LOGGER.info("Columns considered as NoInverted Index are " + noInvertedIdxCol); + LOGGER.info("Columns considered as NoInverted Index are " + noInvertedIdxCol.toString()); } private void initParameters(CarbonFactDataHandlerModel model) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/06b0d081/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java b/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java index 6178cfb..48227d1 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java @@ -24,7 +24,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.PriorityQueue; -import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -198,27 +197,28 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> { for (final File tempFile : files) { - Callable<Void> runnable = new Callable<Void>() { - @Override public Void call() throws CarbonSortKeyAndGroupByException { - // create chunk holder - SortTempFileChunkHolder sortTempFileChunkHolder = - new SortTempFileChunkHolder(tempFile, dimensionCount, complexDimensionCount, - measureCount, fileBufferSize, noDictionaryCount, measureDataType, - isNoDictionaryColumn, isNoDictionarySortColumn); - - // initialize - sortTempFileChunkHolder.initialize(); - sortTempFileChunkHolder.readRow(); + Runnable runnable = new Runnable() { + @Override public void run() { + + // create chunk holder + SortTempFileChunkHolder sortTempFileChunkHolder = + new SortTempFileChunkHolder(tempFile, dimensionCount, complexDimensionCount, + measureCount, fileBufferSize, noDictionaryCount, measureDataType, + isNoDictionaryColumn, isNoDictionarySortColumn); + try { + // initialize + sortTempFileChunkHolder.initialize(); + sortTempFileChunkHolder.readRow(); + } catch (CarbonSortKeyAndGroupByException ex) { + LOGGER.error(ex); + } synchronized (LOCKOBJECT) { recordHolderHeapLocal.add(sortTempFileChunkHolder); } - - // add to heap - return null; } }; - service.submit(runnable); + service.execute(runnable); } service.shutdown();
