shaofengshi closed pull request #303: KYLIN-3597 Refactor methods to reduce Cognitive Complexity URL: https://github.com/apache/kylin/pull/303
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java index dbd19559fe..b1d0f4930e 100755 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java @@ -323,35 +323,48 @@ private CubeInstance updateCubeWithRetry(CubeUpdate update, int retry) throws IO List<String> toRemoveResources = Lists.newArrayList(); if (update.getToRemoveSegs() != null) { - Iterator<CubeSegment> iterator = newSegs.iterator(); - while (iterator.hasNext()) { - CubeSegment currentSeg = iterator.next(); - for (CubeSegment toRemoveSeg : update.getToRemoveSegs()) { - if (currentSeg.getUuid().equals(toRemoveSeg.getUuid())) { - logger.info("Remove segment {}", currentSeg); - toRemoveResources.add(currentSeg.getStatisticsResourcePath()); - iterator.remove(); - break; - } - } - } + getToRemoveSegments(update, newSegs, toRemoveResources); } if (update.getToUpdateSegs() != null) { - for (CubeSegment segment : update.getToUpdateSegs()) { - for (int i = 0; i < newSegs.size(); i++) { - if (newSegs.get(i).getUuid().equals(segment.getUuid())) { - newSegs.set(i, segment); - break; - } - } - } + updateSegments(update, newSegs); } Collections.sort(newSegs); newSegs.validate(); cube.setSegments(newSegs); + setCubeMember(cube, update); + + try { + cube = crud.save(cube); + } catch (WriteConflictException ise) { + logger.warn("Write conflict to update cube {} at try {}, will retry...", cube.getName(), retry); + if (retry >= 7) { + logger.error("Retried 7 times till got error, abandoning...", ise); + throw ise; + } + + cube = crud.reload(cube.getName()); + update.setCubeInstance(cube.latestCopyForWrite()); + return updateCubeWithRetry(update, ++retry); + } + + for (String resource : toRemoveResources) { + try { + getStore().deleteResource(resource); + } catch (IOException ioe) { + logger.error("Failed to delete resource {}", toRemoveResources); + } + } + + //this is a duplicate call to take care of scenarios where REST cache service unavailable + ProjectManager.getInstance(cube.getConfig()).clearL2Cache(cube.getProject()); + + return cube; + } + + private void setCubeMember(CubeInstance cube, CubeUpdate update) { if (update.getStatus() != null) { cube.setStatus(update.getStatus()); } @@ -377,35 +390,32 @@ private CubeInstance updateCubeWithRetry(CubeUpdate update, int retry) throws IO cube.putSnapshotResPath(lookupSnapshotPathEntry.getKey(), lookupSnapshotPathEntry.getValue()); } } + } - try { - cube = crud.save(cube); - } catch (WriteConflictException ise) { - logger.warn("Write conflict to update cube {} at try {}, will retry...", cube.getName(), retry); - if (retry >= 7) { - logger.error("Retried 7 times till got error, abandoning...", ise); - throw ise; + private void updateSegments(CubeUpdate update, Segments<CubeSegment> newSegs) { + for (CubeSegment segment : update.getToUpdateSegs()) { + for (int i = 0; i < newSegs.size(); i++) { + if (newSegs.get(i).getUuid().equals(segment.getUuid())) { + newSegs.set(i, segment); + break; + } } - - cube = crud.reload(cube.getName()); - update.setCubeInstance(cube.latestCopyForWrite()); - return updateCubeWithRetry(update, ++retry); } + } - if (toRemoveResources.size() > 0) { - for (String resource : toRemoveResources) { - try { - getStore().deleteResource(resource); - } catch (IOException ioe) { - logger.error("Failed to delete resource {}", toRemoveResources); + private void getToRemoveSegments(CubeUpdate update, Segments<CubeSegment> newSegs, List<String> toRemoveResources) { + Iterator<CubeSegment> iterator = newSegs.iterator(); + while (iterator.hasNext()) { + CubeSegment currentSeg = iterator.next(); + for (CubeSegment toRemoveSeg : update.getToRemoveSegs()) { + if (currentSeg.getUuid().equals(toRemoveSeg.getUuid())) { + logger.info("Remove segment {}", currentSeg); + toRemoveResources.add(currentSeg.getStatisticsResourcePath()); + iterator.remove(); + break; } } } - - //this is a duplicate call to take care of scenarios where REST cache service unavailable - ProjectManager.getInstance(cube.getConfig()).clearL2Cache(cube.getProject()); - - return cube; } // for test @@ -746,15 +756,7 @@ public CubeSegment mergeSegments(CubeInstance cube, TSRange tsRange, SegmentRang if (cubeCopy.getSegments().getFirstSegment().isOffsetCube()) { // offset cube, merge by date range? - if (segRange == null && tsRange != null) { - Pair<CubeSegment, CubeSegment> pair = cubeCopy.getSegments(SegmentStatusEnum.READY) - .findMergeOffsetsByDateRange(tsRange, Long.MAX_VALUE); - if (pair == null) - throw new IllegalArgumentException( - "Find no segments to merge by " + tsRange + " for cube " + cubeCopy); - segRange = new SegmentRange(pair.getFirst().getSegRange().start, - pair.getSecond().getSegRange().end); - } + segRange = offsetCubeSegRange(cubeCopy, tsRange, segRange); tsRange = null; Preconditions.checkArgument(segRange != null); } else { @@ -779,12 +781,9 @@ public CubeSegment mergeSegments(CubeInstance cube, TSRange tsRange, SegmentRang CubeSegment first = mergingSegments.get(0); CubeSegment last = mergingSegments.get(mergingSegments.size() - 1); if (force == false) { - for (int i = 0; i < mergingSegments.size() - 1; i++) { - if (!mergingSegments.get(i).getSegRange().connects(mergingSegments.get(i + 1).getSegRange())) - throw new IllegalStateException("Merging segments must not have gaps between " - + mergingSegments.get(i) + " and " + mergingSegments.get(i + 1)); - } + checkReadyForMerge(mergingSegments); } + if (first.isOffsetCube()) { newSegment.setSegRange(new SegmentRange(first.getSegRange().start, last.getSegRange().end)); newSegment.setSourcePartitionOffsetStart(first.getSourcePartitionOffsetStart()); @@ -795,21 +794,6 @@ public CubeSegment mergeSegments(CubeInstance cube, TSRange tsRange, SegmentRang newSegment.setSegRange(null); } - if (force == false) { - List<String> emptySegment = Lists.newArrayList(); - for (CubeSegment seg : mergingSegments) { - if (seg.getSizeKB() == 0 && seg.getInputRecords() == 0) { - emptySegment.add(seg.getName()); - } - } - - if (emptySegment.size() > 0) { - throw new IllegalArgumentException( - "Empty cube segment found, couldn't merge unless 'forceMergeEmptySegment' set to true: " - + emptySegment); - } - } - validateNewSegments(cubeCopy, newSegment); CubeUpdate update = new CubeUpdate(cubeCopy); @@ -819,6 +803,43 @@ public CubeSegment mergeSegments(CubeInstance cube, TSRange tsRange, SegmentRang return newSegment; } + private void checkReadyForMerge(Segments<CubeSegment> mergingSegments) { + + // check if the segments to be merged are continuous + for (int i = 0; i < mergingSegments.size() - 1; i++) { + if (!mergingSegments.get(i).getSegRange().connects(mergingSegments.get(i + 1).getSegRange())) + throw new IllegalStateException("Merging segments must not have gaps between " + + mergingSegments.get(i) + " and " + mergingSegments.get(i + 1)); + } + + // check if the segments to be merged are not empty + List<String> emptySegment = Lists.newArrayList(); + for (CubeSegment seg : mergingSegments) { + if (seg.getSizeKB() == 0 && seg.getInputRecords() == 0) { + emptySegment.add(seg.getName()); + } + } + + if (emptySegment.size() > 0) { + throw new IllegalArgumentException( + "Empty cube segment found, couldn't merge unless 'forceMergeEmptySegment' set to true: " + + emptySegment); + } + } + + private SegmentRange offsetCubeSegRange(CubeInstance cubeCopy, TSRange tsRange, SegmentRange segRange) { + if (segRange == null && tsRange != null) { + Pair<CubeSegment, CubeSegment> pair = cubeCopy.getSegments(SegmentStatusEnum.READY) + .findMergeOffsetsByDateRange(tsRange, Long.MAX_VALUE); + if (pair == null) + throw new IllegalArgumentException( + "Find no segments to merge by " + tsRange + " for cube " + cubeCopy); + segRange = new SegmentRange(pair.getFirst().getSegRange().start, + pair.getSecond().getSegRange().end); + } + return segRange; + } + private void checkInputRanges(TSRange tsRange, SegmentRange segRange) { if (tsRange != null && segRange != null) { throw new IllegalArgumentException( diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/GridTableHBaseBenchmark.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/GridTableHBaseBenchmark.java index 6dd16fe494..fb944dce0d 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/GridTableHBaseBenchmark.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/GridTableHBaseBenchmark.java @@ -172,19 +172,18 @@ private static void jumpScan(Connection conn, boolean[] hits, Stats stats) throw int i = 0; while (i < N_ROWS) { - int start; - int end; - for (start = i; start < N_ROWS; start++) { - if (hits[start]) - break; - } - for (end = start + 1; end < N_ROWS; end++) { - boolean isEnd = true; - for (int j = 0; j < jumpThreshold && end + j < N_ROWS; j++) - if (hits[end + j]) - isEnd = false; - if (isEnd) - break; + // find the first hit + int start = i; + while (start + 1 < N_ROWS && !hits[start]) start++; + + // find the last hit within jumpThreshold + int end = start + 1; + int jump = end + 1; + while (jump < N_ROWS && (end + jumpThreshold > jump)) { + if (hits[jump]) { + end = jump; + } + jump++; } if (start < N_ROWS) { ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services