This is an automated email from the ASF dual-hosted git repository. nic pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 17e6c565529f2c04bd883bb12fea67e4069dfc05 Author: etherge <[email protected]> AuthorDate: Thu Jan 23 17:51:50 2020 -0500 minor, sonar issues for unchecked stream read --- .../apache/kylin/gridtable/GTAggregateScanner.java | 15 ++--- .../flink/util/PercentileCounterSerializer.java | 18 ++++-- .../spark/util/PercentileCounterSerializer.java | 11 +++- .../core/storage/columnar/FragmentFilesMerger.java | 72 ++++++++++++++-------- 4 files changed, 78 insertions(+), 38 deletions(-) diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java index 75e79d0..78e413a 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java @@ -434,8 +434,8 @@ public class GTAggregateScanner implements IGTScanner, IGTBypassChecker { sumSpilledSize += dump.size(); // when spilled data is too much, we can modify it by other strategy. // this means, all spilled data is bigger than half of original spillThreshold. - if(sumSpilledSize > spillThreshold) { - for(Dump current : dumps) { + if (sumSpilledSize > spillThreshold) { + for (Dump current : dumps) { current.spill(); } spillThreshold += sumSpilledSize; @@ -678,7 +678,7 @@ public class GTAggregateScanner implements IGTScanner, IGTBypassChecker { + (dumpedFile == null ? "<null>" : dumpedFile.getAbsolutePath())); } - if(spillBuffer == null) { + if (spillBuffer == null) { dis = new DataInputStream(new FileInputStream(dumpedFile)); } else { dis = new DataInputStream(new ByteArrayInputStream(spillBuffer)); @@ -698,10 +698,10 @@ public class GTAggregateScanner implements IGTScanner, IGTBypassChecker { cursorIdx++; int keyLen = dis.readInt(); byte[] key = new byte[keyLen]; - dis.read(key); + dis.readFully(key); int valueLen = dis.readInt(); byte[] value = new byte[valueLen]; - dis.read(value); + dis.readFully(value); return new Pair<>(key, value); } catch (Exception e) { throw new RuntimeException( @@ -720,7 +720,8 @@ public class GTAggregateScanner implements IGTScanner, IGTBypassChecker { } public void spill() throws IOException { - if(spillBuffer == null) return; + if (spillBuffer == null) + return; OutputStream ops = new FileOutputStream(dumpedFile); InputStream ips = new ByteArrayInputStream(spillBuffer); IOUtils.copy(ips, ops); @@ -729,7 +730,7 @@ public class GTAggregateScanner implements IGTScanner, IGTBypassChecker { IOUtils.closeQuietly(ops); logger.info("Spill buffer to disk, location: {}, size = {}.", dumpedFile.getAbsolutePath(), - dumpedFile.length()); + dumpedFile.length()); } public int size() { diff --git a/engine-flink/src/main/java/org/apache/kylin/engine/flink/util/PercentileCounterSerializer.java b/engine-flink/src/main/java/org/apache/kylin/engine/flink/util/PercentileCounterSerializer.java index 684f85e..a0d8f1c 100644 --- a/engine-flink/src/main/java/org/apache/kylin/engine/flink/util/PercentileCounterSerializer.java +++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/util/PercentileCounterSerializer.java @@ -19,13 +19,14 @@ package org.apache.kylin.engine.flink.util; +import java.nio.ByteBuffer; + +import org.apache.kylin.measure.percentile.PercentileCounter; + import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.Serializer; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; -import org.apache.kylin.measure.percentile.PercentileCounter; - -import java.nio.ByteBuffer; /** * A customized kryo serializer for {@link PercentileCounter} @@ -49,7 +50,16 @@ public class PercentileCounterSerializer extends Serializer<PercentileCounter> { double quantileRatio = input.readDouble(); int length = input.readInt(); byte[] buffer = new byte[length]; - input.read(buffer); + + int offset = 0; + int bytesRead; + while ((bytesRead = input.read(buffer, offset, buffer.length - offset)) != -1) { + offset += bytesRead; + if (offset >= buffer.length) { + break; + } + } + PercentileCounter counter = new PercentileCounter(compression, quantileRatio); counter.readRegisters(ByteBuffer.wrap(buffer)); return counter; diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/util/PercentileCounterSerializer.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/util/PercentileCounterSerializer.java index c9ba0f3..1e256f2 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/util/PercentileCounterSerializer.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/util/PercentileCounterSerializer.java @@ -47,7 +47,16 @@ public class PercentileCounterSerializer extends Serializer<PercentileCounter> { double quantileRatio = input.readDouble(); int length = input.readInt(); byte[] buffer = new byte[length]; - input.read(buffer); + + int offset = 0; + int bytesRead; + while ((bytesRead = input.read(buffer, offset, buffer.length - offset)) != -1) { + offset += bytesRead; + if (offset >= buffer.length) { + break; + } + } + PercentileCounter counter = new PercentileCounter(compression, quantileRatio); counter.readRegisters(ByteBuffer.wrap(buffer)); return counter; diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/FragmentFilesMerger.java b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/FragmentFilesMerger.java index 4a9c398..627781b 100644 --- a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/FragmentFilesMerger.java +++ b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/FragmentFilesMerger.java @@ -93,8 +93,8 @@ public class FragmentFilesMerger { } Collections.sort(fragmentList); - FragmentId mergedFragmentId = new FragmentId(fragmentList.get(0).getFragmentId().getStartId(), fragmentList - .get(fragmentList.size() - 1).getFragmentId().getEndId()); + FragmentId mergedFragmentId = new FragmentId(fragmentList.get(0).getFragmentId().getStartId(), + fragmentList.get(fragmentList.size() - 1).getFragmentId().getEndId()); List<FragmentData> fragmentDataList = Lists.newArrayList(); Map<TblColRef, List<Dictionary<String>>> dimDictListMap = Maps.newHashMap(); Map<FragmentId, Map<TblColRef, Dictionary<String>>> fragmentDictionaryMaps = Maps.newHashMap(); @@ -146,8 +146,8 @@ public class FragmentFilesMerger { File mergedFragmentMetaFile = new File(mergeWorkingDirectory, mergedFragmentId + Constants.META_FILE_SUFFIX); try { FragmentMetaInfo mergedFragmentMeta = new FragmentMetaInfo(); - CountingOutputStream fragmentDataOutput = new CountingOutputStream(new BufferedOutputStream( - FileUtils.openOutputStream(mergedFragmentDataFile))); + CountingOutputStream fragmentDataOutput = new CountingOutputStream( + new BufferedOutputStream(FileUtils.openOutputStream(mergedFragmentDataFile))); // merge dictionaries Map<TblColRef, Dictionary<String>> mergedDictMap = mergeAndPersistDictionaries(mergedFragmentMeta, dimDictListMap, fragmentDataOutput); @@ -202,8 +202,8 @@ public class FragmentFilesMerger { List<DimDictionaryMetaInfo> dimDictionaryMetaInfos = Lists.newArrayList(); for (TblColRef dimension : parsedCubeInfo.dimensionsUseDictEncoding) { List<Dictionary<String>> dicts = dimDictListMap.get(dimension); - MultipleDictionaryValueEnumerator multipleDictionaryValueEnumerator = new MultipleDictionaryValueEnumerator(dimension.getType(), - dicts); + MultipleDictionaryValueEnumerator multipleDictionaryValueEnumerator = new MultipleDictionaryValueEnumerator( + dimension.getType(), dicts); Dictionary<String> mergedDict = DictionaryGenerator.buildDictionary(dimension.getType(), multipleDictionaryValueEnumerator); mergedDictMap.put(dimension, mergedDict); @@ -240,10 +240,10 @@ public class FragmentFilesMerger { } else { cuboidMetaInfo = fragmentMetaInfo.getCuboidMetaInfo(cuboidId); } - Map<TblColRef, Dictionary<String>> dictMap = fragmentDictionaryMaps.get(FragmentId.parse(fragmentMetaInfo - .getFragmentId())); - DimensionEncoding[] dimensionEncodings = ParsedStreamingCubeInfo.getDimensionEncodings( - parsedCubeInfo.cubeDesc, dimensions, dictMap); + Map<TblColRef, Dictionary<String>> dictMap = fragmentDictionaryMaps + .get(FragmentId.parse(fragmentMetaInfo.getFragmentId())); + DimensionEncoding[] dimensionEncodings = ParsedStreamingCubeInfo + .getDimensionEncodings(parsedCubeInfo.cubeDesc, dimensions, dictMap); FragmentCuboidReader fragmentCuboidReader = new FragmentCuboidReader(parsedCubeInfo.cubeDesc, fragmentData, cuboidMetaInfo, cuboidInfo.getDimensions(), parsedCubeInfo.measureDescs, dimensionEncodings); fragmentCuboidReaders.add(fragmentCuboidReader); @@ -264,7 +264,8 @@ public class FragmentFilesMerger { if (dict instanceof TrieDictionary) { invertIndexColDescs[i] = new SeqIIColumnDescriptor(dim.getName(), dict.getMinId(), dict.getMaxId()); } else { - invertIndexColDescs[i] = new FixLenIIColumnDescriptor(dim.getName(), encoding.getLengthOfEncoding()); + invertIndexColDescs[i] = new FixLenIIColumnDescriptor(dim.getName(), + encoding.getLengthOfEncoding()); } } else { invertIndexColDescs[i] = new FixLenIIColumnDescriptor(dim.getName(), encoding.getLengthOfEncoding()); @@ -282,8 +283,8 @@ public class FragmentFilesMerger { for (int i = 0; i < metricDataWriters.length; i++) { metricDataWriters[i] = new CuboidMetricDataWriter(cuboidId, parsedCubeInfo.measureDescs[i].getName(), parsedCubeInfo.getMeasureTypeSerializer(i).maxLength()); - metricsEncodings[i] = ColumnarMetricsEncodingFactory.create(parsedCubeInfo.measureDescs[i].getFunction() - .getReturnDataType()); + metricsEncodings[i] = ColumnarMetricsEncodingFactory + .create(parsedCubeInfo.measureDescs[i].getFunction().getReturnDataType()); } FragmentCuboidDataMerger fragmentCuboidDataMerger = new FragmentCuboidDataMerger(cuboidInfo, @@ -324,19 +325,29 @@ public class FragmentFilesMerger { for (int i = 0; i < dimDataWriters.length; i++) { DimensionEncoding encoding = mergedDimEncodings[i]; int dimFixLen = encoding.getLengthOfEncoding(); - InputStream dimInput = new BufferedInputStream(FileUtils.openInputStream(dimDataWriters[i].getOutputFile())); + InputStream dimInput = new BufferedInputStream( + FileUtils.openInputStream(dimDataWriters[i].getOutputFile())); try { DimensionMetaInfo dimensionMeta = new DimensionMetaInfo(); dimensionMeta.setName(dimensions[i].getName()); int startOffset = (int) fragmentDataOutput.getCount(); dimensionMeta.setStartOffset(startOffset); - ColumnarStoreDimDesc cStoreDimDesc = ColumnarStoreDimDesc.getDefaultCStoreDimDesc(parsedCubeInfo.cubeDesc, - dimensions[i].getName(), encoding); + ColumnarStoreDimDesc cStoreDimDesc = ColumnarStoreDimDesc + .getDefaultCStoreDimDesc(parsedCubeInfo.cubeDesc, dimensions[i].getName(), encoding); ColumnDataWriter columnDataWriter = cStoreDimDesc.getDimWriter(fragmentDataOutput, rowCnt); for (int j = 0; j < rowCnt; j++) { byte[] dimValue = new byte[dimFixLen]; - dimInput.read(dimValue); + + int offset = 0; + int bytesRead; + while ((bytesRead = dimInput.read(dimValue, offset, dimValue.length - offset)) != -1) { + offset += bytesRead; + if (offset >= dimValue.length) { + break; + } + } + if (DimensionEncoding.isNull(dimValue, 0, dimValue.length)) { dimensionMeta.setHasNull(true); } @@ -358,8 +369,8 @@ public class FragmentFilesMerger { } for (int i = 0; i < metricDataWriters.length; i++) { - DataInputStream metricInput = new DataInputStream(new BufferedInputStream( - FileUtils.openInputStream(metricDataWriters[i].getOutputFile()))); + DataInputStream metricInput = new DataInputStream( + new BufferedInputStream(FileUtils.openInputStream(metricDataWriters[i].getOutputFile()))); try { ColumnarMetricsEncoding metricsEncoding = ColumnarMetricsEncodingFactory .create(parsedCubeInfo.measureDescs[i].getFunction().getReturnDataType()); @@ -373,7 +384,16 @@ public class FragmentFilesMerger { for (int j = 0; j < rowCnt; j++) { int metricLen = metricInput.readInt(); byte[] metricValue = new byte[metricLen]; - metricInput.read(metricValue); + + int offset = 0; + int bytesRead; + while ((bytesRead = metricInput.read(metricValue, offset, metricValue.length - offset)) != -1) { + offset += bytesRead; + if (offset >= metricValue.length) { + break; + } + } + columnDataWriter.write(metricValue); } columnDataWriter.flush(); @@ -404,7 +424,8 @@ public class FragmentFilesMerger { this.colName = colName; this.tmpColDataFile = new File(mergeWorkingDirectory, cuboidId + "-" + colName + ".data"); - this.output = new CountingOutputStream(new BufferedOutputStream(FileUtils.openOutputStream(tmpColDataFile))); + this.output = new CountingOutputStream( + new BufferedOutputStream(FileUtils.openOutputStream(tmpColDataFile))); } public void write(byte[] value) throws IOException { @@ -437,8 +458,8 @@ public class FragmentFilesMerger { this.metricName = metricName; this.maxValLen = maxValLen; this.tmpMetricDataFile = new File(mergeWorkingDirectory, cuboidId + "-" + metricName + ".data"); - this.countingOutput = new CountingOutputStream(new BufferedOutputStream( - FileUtils.openOutputStream(tmpMetricDataFile))); + this.countingOutput = new CountingOutputStream( + new BufferedOutputStream(FileUtils.openOutputStream(tmpMetricDataFile))); this.output = new DataOutputStream(countingOutput); } @@ -534,9 +555,8 @@ public class FragmentFilesMerger { enqueueFromFragment(currRecordEntry.getSecond()); boolean needAggregate = false; boolean first = true; - while ((!minHeap.isEmpty()) - && StringArrayComparator.INSTANCE.compare(currRecord.dimensions, - minHeap.peek().getFirst().dimensions) == 0) { + while ((!minHeap.isEmpty()) && StringArrayComparator.INSTANCE.compare(currRecord.dimensions, + minHeap.peek().getFirst().dimensions) == 0) { if (first) { doAggregate(currRecord); first = false;
