http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordReader.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordReader.java b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordReader.java new file mode 100644 index 0000000..69d2a3b --- /dev/null +++ b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordReader.java @@ -0,0 +1,761 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.streaming; + +import java.io.IOException; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.nio.ByteBuffer; +import java.util.BitSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.carbondata.core.cache.Cache; +import org.apache.carbondata.core.cache.CacheProvider; +import org.apache.carbondata.core.cache.CacheType; +import org.apache.carbondata.core.cache.dictionary.Dictionary; +import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastore.block.SegmentProperties; +import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator; +import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory; +import org.apache.carbondata.core.metadata.datatype.DataType; +import org.apache.carbondata.core.metadata.datatype.DataTypes; +import org.apache.carbondata.core.metadata.encoder.Encoding; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; +import org.apache.carbondata.core.reader.CarbonHeaderReader; +import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException; +import org.apache.carbondata.core.scan.filter.FilterUtil; +import org.apache.carbondata.core.scan.filter.GenericQueryType; +import org.apache.carbondata.core.scan.filter.executer.FilterExecuter; +import org.apache.carbondata.core.scan.filter.intf.RowImpl; +import org.apache.carbondata.core.scan.filter.intf.RowIntf; +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; +import org.apache.carbondata.core.scan.model.QueryModel; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.DataTypeUtil; +import org.apache.carbondata.format.BlockletHeader; +import org.apache.carbondata.format.FileHeader; +import org.apache.carbondata.hadoop.CarbonInputSplit; +import org.apache.carbondata.hadoop.CarbonMultiBlockSplit; +import org.apache.carbondata.hadoop.InputMetricsStats; +import org.apache.carbondata.hadoop.api.CarbonTableInputFormat; +import org.apache.carbondata.processing.util.CarbonDataProcessorUtil; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.spark.memory.MemoryMode; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; +import org.apache.spark.sql.execution.vectorized.ColumnVector; +import org.apache.spark.sql.execution.vectorized.ColumnarBatch; +import org.apache.spark.sql.types.CalendarIntervalType; +import org.apache.spark.sql.types.Decimal; +import org.apache.spark.sql.types.DecimalType; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.unsafe.types.CalendarInterval; +import org.apache.spark.unsafe.types.UTF8String; + +/** + * Stream record reader + */ +public class CarbonStreamRecordReader extends RecordReader<Void, Object> { + // vector reader + private boolean isVectorReader; + + // metadata + private CarbonTable carbonTable; + private CarbonColumn[] storageColumns; + private boolean[] isRequired; + private DataType[] measureDataTypes; + private int dimensionCount; + private int measureCount; + + // input + private FileSplit fileSplit; + private Configuration hadoopConf; + private StreamBlockletReader input; + private boolean isFirstRow = true; + private QueryModel model; + + // decode data + private BitSet allNonNull; + private boolean[] isNoDictColumn; + private DirectDictionaryGenerator[] directDictionaryGenerators; + private CacheProvider cacheProvider; + private Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache; + private GenericQueryType[] queryTypes; + + // vectorized reader + private StructType outputSchema; + private ColumnarBatch columnarBatch; + private boolean isFinished = false; + + // filter + private FilterExecuter filter; + private boolean[] isFilterRequired; + private Object[] filterValues; + private RowIntf filterRow; + private int[] filterMap; + + // output + private CarbonColumn[] projection; + private boolean[] isProjectionRequired; + private int[] projectionMap; + private Object[] outputValues; + private InternalRow outputRow; + + // empty project, null filter + private boolean skipScanData; + + // return raw row for handoff + private boolean useRawRow = false; + + // InputMetricsStats + private InputMetricsStats inputMetricsStats; + + @Override public void initialize(InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + // input + if (split instanceof CarbonInputSplit) { + fileSplit = (CarbonInputSplit) split; + } else if (split instanceof CarbonMultiBlockSplit) { + fileSplit = ((CarbonMultiBlockSplit) split).getAllSplits().get(0); + } else { + fileSplit = (FileSplit) split; + } + + // metadata + hadoopConf = context.getConfiguration(); + if (model == null) { + CarbonTableInputFormat format = new CarbonTableInputFormat<Object>(); + model = format.createQueryModel(split, context); + } + carbonTable = model.getTable(); + List<CarbonDimension> dimensions = + carbonTable.getDimensionByTableName(carbonTable.getTableName()); + dimensionCount = dimensions.size(); + List<CarbonMeasure> measures = + carbonTable.getMeasureByTableName(carbonTable.getTableName()); + measureCount = measures.size(); + List<CarbonColumn> carbonColumnList = + carbonTable.getStreamStorageOrderColumn(carbonTable.getTableName()); + storageColumns = carbonColumnList.toArray(new CarbonColumn[carbonColumnList.size()]); + isNoDictColumn = CarbonDataProcessorUtil.getNoDictionaryMapping(storageColumns); + directDictionaryGenerators = new DirectDictionaryGenerator[storageColumns.length]; + for (int i = 0; i < storageColumns.length; i++) { + if (storageColumns[i].hasEncoding(Encoding.DIRECT_DICTIONARY)) { + directDictionaryGenerators[i] = DirectDictionaryKeyGeneratorFactory + .getDirectDictionaryGenerator(storageColumns[i].getDataType()); + } + } + measureDataTypes = new DataType[measureCount]; + for (int i = 0; i < measureCount; i++) { + measureDataTypes[i] = storageColumns[dimensionCount + i].getDataType(); + } + + // decode data + allNonNull = new BitSet(storageColumns.length); + projection = model.getProjectionColumns(); + + isRequired = new boolean[storageColumns.length]; + boolean[] isFiltlerDimensions = model.getIsFilterDimensions(); + boolean[] isFiltlerMeasures = model.getIsFilterMeasures(); + isFilterRequired = new boolean[storageColumns.length]; + filterMap = new int[storageColumns.length]; + for (int i = 0; i < storageColumns.length; i++) { + if (storageColumns[i].isDimension()) { + if (isFiltlerDimensions[storageColumns[i].getOrdinal()]) { + isRequired[i] = true; + isFilterRequired[i] = true; + filterMap[i] = storageColumns[i].getOrdinal(); + } + } else { + if (isFiltlerMeasures[storageColumns[i].getOrdinal()]) { + isRequired[i] = true; + isFilterRequired[i] = true; + filterMap[i] = carbonTable.getDimensionOrdinalMax() + storageColumns[i].getOrdinal(); + } + } + } + + isProjectionRequired = new boolean[storageColumns.length]; + projectionMap = new int[storageColumns.length]; + for (int i = 0; i < storageColumns.length; i++) { + for (int j = 0; j < projection.length; j++) { + if (storageColumns[i].getColName().equals(projection[j].getColName())) { + isRequired[i] = true; + isProjectionRequired[i] = true; + projectionMap[i] = j; + break; + } + } + } + + // initialize filter + if (null != model.getFilterExpressionResolverTree()) { + initializeFilter(); + } else if (projection.length == 0) { + skipScanData = true; + } + + } + + private void initializeFilter() { + + List<ColumnSchema> wrapperColumnSchemaList = CarbonUtil + .getColumnSchemaList(carbonTable.getDimensionByTableName(carbonTable.getTableName()), + carbonTable.getMeasureByTableName(carbonTable.getTableName())); + int[] dimLensWithComplex = new int[wrapperColumnSchemaList.size()]; + for (int i = 0; i < dimLensWithComplex.length; i++) { + dimLensWithComplex[i] = Integer.MAX_VALUE; + } + + int[] dictionaryColumnCardinality = + CarbonUtil.getFormattedCardinality(dimLensWithComplex, wrapperColumnSchemaList); + SegmentProperties segmentProperties = + new SegmentProperties(wrapperColumnSchemaList, dictionaryColumnCardinality); + Map<Integer, GenericQueryType> complexDimensionInfoMap = new HashMap<>(); + + FilterResolverIntf resolverIntf = model.getFilterExpressionResolverTree(); + filter = FilterUtil.getFilterExecuterTree(resolverIntf, segmentProperties, + complexDimensionInfoMap); + // for row filter, we need update column index + FilterUtil.updateIndexOfColumnExpression(resolverIntf.getFilterExpression(), + carbonTable.getDimensionOrdinalMax()); + + } + + public void setQueryModel(QueryModel model) { + this.model = model; + } + + private byte[] getSyncMarker(String filePath) throws IOException { + CarbonHeaderReader headerReader = new CarbonHeaderReader(filePath); + FileHeader header = headerReader.readHeader(); + return header.getSync_marker(); + } + + public void setUseRawRow(boolean useRawRow) { + this.useRawRow = useRawRow; + } + + private void initializeAtFirstRow() throws IOException { + filterValues = new Object[carbonTable.getDimensionOrdinalMax() + measureCount]; + filterRow = new RowImpl(); + filterRow.setValues(filterValues); + + outputValues = new Object[projection.length]; + outputRow = new GenericInternalRow(outputValues); + + Path file = fileSplit.getPath(); + + byte[] syncMarker = getSyncMarker(file.toString()); + + FileSystem fs = file.getFileSystem(hadoopConf); + + int bufferSize = Integer.parseInt(hadoopConf.get(CarbonStreamInputFormat.READ_BUFFER_SIZE, + CarbonStreamInputFormat.READ_BUFFER_SIZE_DEFAULT)); + + FSDataInputStream fileIn = fs.open(file, bufferSize); + fileIn.seek(fileSplit.getStart()); + input = new StreamBlockletReader(syncMarker, fileIn, fileSplit.getLength(), + fileSplit.getStart() == 0); + + cacheProvider = CacheProvider.getInstance(); + cache = cacheProvider.createCache(CacheType.FORWARD_DICTIONARY); + queryTypes = CarbonStreamInputFormat.getComplexDimensions(carbonTable, storageColumns, cache); + + outputSchema = new StructType((StructField[]) + DataTypeUtil.getDataTypeConverter().convertCarbonSchemaToSparkSchema(projection)); + } + + @Override public boolean nextKeyValue() throws IOException, InterruptedException { + if (isFirstRow) { + isFirstRow = false; + initializeAtFirstRow(); + } + if (isFinished) { + return false; + } + + if (isVectorReader) { + return nextColumnarBatch(); + } + + return nextRow(); + } + + /** + * for vector reader, check next columnar batch + */ + private boolean nextColumnarBatch() throws IOException { + boolean hasNext; + boolean scanMore = false; + do { + // move to the next blocklet + hasNext = input.nextBlocklet(); + if (hasNext) { + // read blocklet header + BlockletHeader header = input.readBlockletHeader(); + if (isScanRequired(header)) { + scanMore = !scanBlockletAndFillVector(header); + } else { + input.skipBlockletData(true); + scanMore = true; + } + } else { + isFinished = true; + scanMore = false; + } + } while (scanMore); + return hasNext; + } + + /** + * check next Row + */ + private boolean nextRow() throws IOException { + // read row one by one + try { + boolean hasNext; + boolean scanMore = false; + do { + hasNext = input.hasNext(); + if (hasNext) { + if (skipScanData) { + input.nextRow(); + scanMore = false; + } else { + if (useRawRow) { + // read raw row for streaming handoff which does not require decode raw row + readRawRowFromStream(); + } else { + readRowFromStream(); + } + if (null != filter) { + scanMore = !filter.applyFilter(filterRow, carbonTable.getDimensionOrdinalMax()); + } else { + scanMore = false; + } + } + } else { + if (input.nextBlocklet()) { + BlockletHeader header = input.readBlockletHeader(); + if (isScanRequired(header)) { + if (skipScanData) { + input.skipBlockletData(false); + } else { + input.readBlockletData(header); + } + } else { + input.skipBlockletData(true); + } + scanMore = true; + } else { + isFinished = true; + scanMore = false; + } + } + } while (scanMore); + return hasNext; + } catch (FilterUnsupportedException e) { + throw new IOException("Failed to filter row in detail reader", e); + } + } + + @Override public Void getCurrentKey() throws IOException, InterruptedException { + return null; + } + + @Override public Object getCurrentValue() throws IOException, InterruptedException { + if (isVectorReader) { + int value = columnarBatch.numValidRows(); + if (inputMetricsStats != null) { + inputMetricsStats.incrementRecordRead((long) value); + } + + return columnarBatch; + } + + if (inputMetricsStats != null) { + inputMetricsStats.incrementRecordRead(1L); + } + + return outputRow; + } + + private boolean isScanRequired(BlockletHeader header) { + // TODO require to implement min-max index + if (null == filter) { + return true; + } + return true; + } + + private boolean scanBlockletAndFillVector(BlockletHeader header) throws IOException { + // if filter is null and output projection is empty, use the row number of blocklet header + if (skipScanData) { + int rowNums = header.getBlocklet_info().getNum_rows(); + columnarBatch = ColumnarBatch.allocate(outputSchema, MemoryMode.OFF_HEAP, rowNums); + columnarBatch.setNumRows(rowNums); + input.skipBlockletData(true); + return rowNums > 0; + } + + input.readBlockletData(header); + columnarBatch = ColumnarBatch.allocate(outputSchema, MemoryMode.OFF_HEAP, input.getRowNums()); + int rowNum = 0; + if (null == filter) { + while (input.hasNext()) { + readRowFromStream(); + putRowToColumnBatch(rowNum++); + } + } else { + try { + while (input.hasNext()) { + readRowFromStream(); + if (filter.applyFilter(filterRow, carbonTable.getDimensionOrdinalMax())) { + putRowToColumnBatch(rowNum++); + } + } + } catch (FilterUnsupportedException e) { + throw new IOException("Failed to filter row in vector reader", e); + } + } + columnarBatch.setNumRows(rowNum); + return rowNum > 0; + } + + private void readRowFromStream() { + input.nextRow(); + short nullLen = input.readShort(); + BitSet nullBitSet = allNonNull; + if (nullLen > 0) { + nullBitSet = BitSet.valueOf(input.readBytes(nullLen)); + } + int colCount = 0; + // primitive type dimension + for (; colCount < isNoDictColumn.length; colCount++) { + if (nullBitSet.get(colCount)) { + if (isFilterRequired[colCount]) { + filterValues[filterMap[colCount]] = CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY; + } + if (isProjectionRequired[colCount]) { + outputValues[projectionMap[colCount]] = null; + } + } else { + if (isNoDictColumn[colCount]) { + int v = input.readShort(); + if (isRequired[colCount]) { + byte[] b = input.readBytes(v); + if (isFilterRequired[colCount]) { + filterValues[filterMap[colCount]] = b; + } + if (isProjectionRequired[colCount]) { + outputValues[projectionMap[colCount]] = + DataTypeUtil.getDataBasedOnDataTypeForNoDictionaryColumn(b, + storageColumns[colCount].getDataType()); + } + } else { + input.skipBytes(v); + } + } else if (null != directDictionaryGenerators[colCount]) { + if (isRequired[colCount]) { + if (isFilterRequired[colCount]) { + filterValues[filterMap[colCount]] = input.copy(4); + } + if (isProjectionRequired[colCount]) { + outputValues[projectionMap[colCount]] = + directDictionaryGenerators[colCount].getValueFromSurrogate(input.readInt()); + } else { + input.skipBytes(4); + } + } else { + input.skipBytes(4); + } + } else { + if (isRequired[colCount]) { + if (isFilterRequired[colCount]) { + filterValues[filterMap[colCount]] = input.copy(4); + } + if (isProjectionRequired[colCount]) { + outputValues[projectionMap[colCount]] = input.readInt(); + } else { + input.skipBytes(4); + } + } else { + input.skipBytes(4); + } + } + } + } + // complex type dimension + for (; colCount < dimensionCount; colCount++) { + if (nullBitSet.get(colCount)) { + if (isFilterRequired[colCount]) { + filterValues[filterMap[colCount]] = null; + } + if (isProjectionRequired[colCount]) { + outputValues[projectionMap[colCount]] = null; + } + } else { + short v = input.readShort(); + if (isRequired[colCount]) { + byte[] b = input.readBytes(v); + if (isFilterRequired[colCount]) { + filterValues[filterMap[colCount]] = b; + } + if (isProjectionRequired[colCount]) { + outputValues[projectionMap[colCount]] = queryTypes[colCount] + .getDataBasedOnDataTypeFromSurrogates(ByteBuffer.wrap(b)); + } + } else { + input.skipBytes(v); + } + } + } + // measure + DataType dataType; + for (int msrCount = 0; msrCount < measureCount; msrCount++, colCount++) { + if (nullBitSet.get(colCount)) { + if (isFilterRequired[colCount]) { + filterValues[filterMap[colCount]] = null; + } + if (isProjectionRequired[colCount]) { + outputValues[projectionMap[colCount]] = null; + } + } else { + dataType = measureDataTypes[msrCount]; + if (dataType == DataTypes.BOOLEAN) { + if (isRequired[colCount]) { + boolean v = input.readBoolean(); + if (isFilterRequired[colCount]) { + filterValues[filterMap[colCount]] = v; + } + if (isProjectionRequired[colCount]) { + outputValues[projectionMap[colCount]] = v; + } + } else { + input.skipBytes(1); + } + } else if (dataType == DataTypes.SHORT) { + if (isRequired[colCount]) { + short v = input.readShort(); + if (isFilterRequired[colCount]) { + filterValues[filterMap[colCount]] = v; + } + if (isProjectionRequired[colCount]) { + outputValues[projectionMap[colCount]] = v; + } + } else { + input.skipBytes(2); + } + } else if (dataType == DataTypes.INT) { + if (isRequired[colCount]) { + int v = input.readInt(); + if (isFilterRequired[colCount]) { + filterValues[filterMap[colCount]] = v; + } + if (isProjectionRequired[colCount]) { + outputValues[projectionMap[colCount]] = v; + } + } else { + input.skipBytes(4); + } + } else if (dataType == DataTypes.LONG) { + if (isRequired[colCount]) { + long v = input.readLong(); + if (isFilterRequired[colCount]) { + filterValues[filterMap[colCount]] = v; + } + if (isProjectionRequired[colCount]) { + outputValues[projectionMap[colCount]] = v; + } + } else { + input.skipBytes(8); + } + } else if (dataType == DataTypes.DOUBLE) { + if (isRequired[colCount]) { + double v = input.readDouble(); + if (isFilterRequired[colCount]) { + filterValues[filterMap[colCount]] = v; + } + if (isProjectionRequired[colCount]) { + outputValues[projectionMap[colCount]] = v; + } + } else { + input.skipBytes(8); + } + } else if (DataTypes.isDecimal(dataType)) { + int len = input.readShort(); + if (isRequired[colCount]) { + BigDecimal v = DataTypeUtil.byteToBigDecimal(input.readBytes(len)); + if (isFilterRequired[colCount]) { + filterValues[filterMap[colCount]] = v; + } + if (isProjectionRequired[colCount]) { + outputValues[projectionMap[colCount]] = + DataTypeUtil.getDataTypeConverter().convertFromBigDecimalToDecimal(v); + } + } else { + input.skipBytes(len); + } + } + } + } + } + + private void readRawRowFromStream() { + input.nextRow(); + short nullLen = input.readShort(); + BitSet nullBitSet = allNonNull; + if (nullLen > 0) { + nullBitSet = BitSet.valueOf(input.readBytes(nullLen)); + } + int colCount = 0; + // primitive type dimension + for (; colCount < isNoDictColumn.length; colCount++) { + if (nullBitSet.get(colCount)) { + outputValues[colCount] = CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY; + } else { + if (isNoDictColumn[colCount]) { + int v = input.readShort(); + outputValues[colCount] = input.readBytes(v); + } else { + outputValues[colCount] = input.readInt(); + } + } + } + // complex type dimension + for (; colCount < dimensionCount; colCount++) { + if (nullBitSet.get(colCount)) { + outputValues[colCount] = null; + } else { + short v = input.readShort(); + outputValues[colCount] = input.readBytes(v); + } + } + // measure + DataType dataType; + for (int msrCount = 0; msrCount < measureCount; msrCount++, colCount++) { + if (nullBitSet.get(colCount)) { + outputValues[colCount] = null; + } else { + dataType = measureDataTypes[msrCount]; + if (dataType == DataTypes.BOOLEAN) { + outputValues[colCount] = input.readBoolean(); + } else if (dataType == DataTypes.SHORT) { + outputValues[colCount] = input.readShort(); + } else if (dataType == DataTypes.INT) { + outputValues[colCount] = input.readInt(); + } else if (dataType == DataTypes.LONG) { + outputValues[colCount] = input.readLong(); + } else if (dataType == DataTypes.DOUBLE) { + outputValues[colCount] = input.readDouble(); + } else if (DataTypes.isDecimal(dataType)) { + int len = input.readShort(); + outputValues[colCount] = DataTypeUtil.byteToBigDecimal(input.readBytes(len)); + } + } + } + } + + private void putRowToColumnBatch(int rowId) { + for (int i = 0; i < projection.length; i++) { + Object value = outputValues[i]; + ColumnVector col = columnarBatch.column(i); + org.apache.spark.sql.types.DataType t = col.dataType(); + if (null == value) { + col.putNull(rowId); + } else { + if (t == org.apache.spark.sql.types.DataTypes.BooleanType) { + col.putBoolean(rowId, (boolean)value); + } else if (t == org.apache.spark.sql.types.DataTypes.ByteType) { + col.putByte(rowId, (byte) value); + } else if (t == org.apache.spark.sql.types.DataTypes.ShortType) { + col.putShort(rowId, (short) value); + } else if (t == org.apache.spark.sql.types.DataTypes.IntegerType) { + col.putInt(rowId, (int) value); + } else if (t == org.apache.spark.sql.types.DataTypes.LongType) { + col.putLong(rowId, (long) value); + } else if (t == org.apache.spark.sql.types.DataTypes.FloatType) { + col.putFloat(rowId, (float) value); + } else if (t == org.apache.spark.sql.types.DataTypes.DoubleType) { + col.putDouble(rowId, (double) value); + } else if (t == org.apache.spark.sql.types.DataTypes.StringType) { + UTF8String v = (UTF8String) value; + col.putByteArray(rowId, v.getBytes()); + } else if (t instanceof org.apache.spark.sql.types.DecimalType) { + DecimalType dt = (DecimalType)t; + Decimal d = Decimal.fromDecimal(value); + if (dt.precision() <= Decimal.MAX_INT_DIGITS()) { + col.putInt(rowId, (int)d.toUnscaledLong()); + } else if (dt.precision() <= Decimal.MAX_LONG_DIGITS()) { + col.putLong(rowId, d.toUnscaledLong()); + } else { + final BigInteger integer = d.toJavaBigDecimal().unscaledValue(); + byte[] bytes = integer.toByteArray(); + col.putByteArray(rowId, bytes, 0, bytes.length); + } + } else if (t instanceof CalendarIntervalType) { + CalendarInterval c = (CalendarInterval) value; + col.getChildColumn(0).putInt(rowId, c.months); + col.getChildColumn(1).putLong(rowId, c.microseconds); + } else if (t instanceof org.apache.spark.sql.types.DateType) { + col.putInt(rowId, (int) value); + } else if (t instanceof org.apache.spark.sql.types.TimestampType) { + col.putLong(rowId, (long) value); + } + } + } + } + + @Override public float getProgress() throws IOException, InterruptedException { + return 0; + } + + public void setVectorReader(boolean isVectorReader) { + this.isVectorReader = isVectorReader; + } + + public void setInputMetricsStats(InputMetricsStats inputMetricsStats) { + this.inputMetricsStats = inputMetricsStats; + } + + @Override public void close() throws IOException { + if (null != input) { + input.close(); + } + if (null != columnarBatch) { + columnarBatch.close(); + } + } +}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java new file mode 100644 index 0000000..4e555d3 --- /dev/null +++ b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.streaming; + +import java.io.DataOutputStream; +import java.io.File; +import java.io.IOException; +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.BitSet; +import java.util.List; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.datastore.filesystem.CarbonFile; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.datastore.row.CarbonRow; +import org.apache.carbondata.core.metadata.datatype.DataType; +import org.apache.carbondata.core.metadata.datatype.DataTypes; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; +import org.apache.carbondata.core.util.CarbonMetadataUtil; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.DataTypeUtil; +import org.apache.carbondata.core.util.path.CarbonTablePath; +import org.apache.carbondata.format.FileHeader; +import org.apache.carbondata.processing.loading.BadRecordsLogger; +import org.apache.carbondata.processing.loading.BadRecordsLoggerProvider; +import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration; +import org.apache.carbondata.processing.loading.DataField; +import org.apache.carbondata.processing.loading.DataLoadProcessBuilder; +import org.apache.carbondata.processing.loading.converter.RowConverter; +import org.apache.carbondata.processing.loading.converter.impl.RowConverterImpl; +import org.apache.carbondata.processing.loading.model.CarbonLoadModel; +import org.apache.carbondata.processing.loading.parser.RowParser; +import org.apache.carbondata.processing.loading.parser.impl.RowParserImpl; +import org.apache.carbondata.processing.store.writer.AbstractFactDataWriter; +import org.apache.carbondata.processing.util.CarbonDataProcessorUtil; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskID; + +/** + * Stream record writer + */ +public class CarbonStreamRecordWriter extends RecordWriter<Void, Object> { + + private static final LogService LOGGER = + LogServiceFactory.getLogService(CarbonStreamRecordWriter.class.getName()); + + // basic info + private Configuration hadoopConf; + private CarbonLoadModel carbonLoadModel; + private CarbonDataLoadConfiguration configuration; + private CarbonTable carbonTable; + private int maxRowNums; + private int maxCacheSize; + + // parser and converter + private RowParser rowParser; + private BadRecordsLogger badRecordLogger; + private RowConverter converter; + private CarbonRow currentRow = new CarbonRow(null); + + // encoder + private DataField[] dataFields; + private BitSet nullBitSet; + private boolean[] isNoDictionaryDimensionColumn; + private int dimensionWithComplexCount; + private int measureCount; + private DataType[] measureDataTypes; + private StreamBlockletWriter output = null; + + // data write + private String segmentDir; + private String fileName; + private DataOutputStream outputStream; + private boolean isFirstRow = true; + private boolean hasException = false; + + CarbonStreamRecordWriter(TaskAttemptContext job) throws IOException { + initialize(job); + } + + public CarbonStreamRecordWriter(TaskAttemptContext job, CarbonLoadModel carbonLoadModel) + throws IOException { + this.carbonLoadModel = carbonLoadModel; + initialize(job); + } + + private void initialize(TaskAttemptContext job) throws IOException { + // set basic information + hadoopConf = job.getConfiguration(); + if (carbonLoadModel == null) { + carbonLoadModel = CarbonStreamOutputFormat.getCarbonLoadModel(hadoopConf); + if (carbonLoadModel == null) { + throw new IOException( + "CarbonStreamRecordWriter require configuration: mapreduce.output.carbon.load.model"); + } + } + String segmentId = CarbonStreamOutputFormat.getSegmentId(hadoopConf); + carbonLoadModel.setSegmentId(segmentId); + carbonTable = carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable(); + long taskNo = TaskID.forName(hadoopConf.get("mapred.tip.id")).getId(); + carbonLoadModel.setTaskNo("" + taskNo); + configuration = DataLoadProcessBuilder.createConfiguration(carbonLoadModel); + maxRowNums = hadoopConf.getInt(CarbonStreamOutputFormat.CARBON_STREAM_BLOCKLET_ROW_NUMS, + CarbonStreamOutputFormat.CARBON_STREAM_BLOCKLET_ROW_NUMS_DEFAULT) - 1; + maxCacheSize = hadoopConf.getInt(CarbonStreamOutputFormat.CARBON_STREAM_CACHE_SIZE, + CarbonStreamOutputFormat.CARBON_STREAM_CACHE_SIZE_DEFAULT); + + segmentDir = CarbonTablePath.getSegmentPath( + carbonTable.getAbsoluteTableIdentifier().getTablePath(), segmentId); + fileName = CarbonTablePath.getCarbonDataFileName(0, taskNo, 0, 0, "0"); + } + + private void initializeAtFirstRow() throws IOException, InterruptedException { + + // initialize metadata + isNoDictionaryDimensionColumn = + CarbonDataProcessorUtil.getNoDictionaryMapping(configuration.getDataFields()); + dimensionWithComplexCount = configuration.getDimensionCount(); + measureCount = configuration.getMeasureCount(); + dataFields = configuration.getDataFields(); + measureDataTypes = new DataType[measureCount]; + for (int i = 0; i < measureCount; i++) { + measureDataTypes[i] = + dataFields[dimensionWithComplexCount + i].getColumn().getDataType(); + } + + // initialize parser and converter + rowParser = new RowParserImpl(dataFields, configuration); + badRecordLogger = BadRecordsLoggerProvider.createBadRecordLogger(configuration); + converter = new RowConverterImpl(configuration.getDataFields(), configuration, badRecordLogger); + configuration.setCardinalityFinder(converter); + converter.initialize(); + + // initialize encoder + nullBitSet = new BitSet(dataFields.length); + int rowBufferSize = hadoopConf.getInt(CarbonStreamOutputFormat.CARBON_ENCODER_ROW_BUFFER_SIZE, + CarbonStreamOutputFormat.CARBON_ENCODER_ROW_BUFFER_SIZE_DEFAULT); + output = new StreamBlockletWriter(maxCacheSize, maxRowNums, rowBufferSize); + + // initialize data writer + String filePath = segmentDir + File.separator + fileName; + FileFactory.FileType fileType = FileFactory.getFileType(filePath); + CarbonFile carbonFile = FileFactory.getCarbonFile(filePath, fileType); + if (carbonFile.exists()) { + // if the file is existed, use the append api + outputStream = FileFactory.getDataOutputStreamUsingAppend(filePath, fileType); + } else { + // IF the file is not existed, use the create api + outputStream = FileFactory.getDataOutputStream(filePath, fileType); + writeFileHeader(); + } + + isFirstRow = false; + } + + @Override public void write(Void key, Object value) throws IOException, InterruptedException { + if (isFirstRow) { + initializeAtFirstRow(); + } + + // parse and convert row + currentRow.setData(rowParser.parseRow((Object[]) value)); + converter.convert(currentRow); + + // null bit set + nullBitSet.clear(); + for (int i = 0; i < dataFields.length; i++) { + if (null == currentRow.getObject(i)) { + nullBitSet.set(i); + } + } + output.nextRow(); + byte[] b = nullBitSet.toByteArray(); + output.writeShort(b.length); + if (b.length > 0) { + output.writeBytes(b); + } + int dimCount = 0; + Object columnValue; + + // primitive type dimension + for (; dimCount < isNoDictionaryDimensionColumn.length; dimCount++) { + columnValue = currentRow.getObject(dimCount); + if (null != columnValue) { + if (isNoDictionaryDimensionColumn[dimCount]) { + byte[] col = (byte[]) columnValue; + output.writeShort(col.length); + output.writeBytes(col); + } else { + output.writeInt((int) columnValue); + } + } + } + // complex type dimension + for (; dimCount < dimensionWithComplexCount; dimCount++) { + columnValue = currentRow.getObject(dimCount); + if (null != columnValue) { + byte[] col = (byte[]) columnValue; + output.writeShort(col.length); + output.writeBytes(col); + } + } + // measure + DataType dataType; + for (int msrCount = 0; msrCount < measureCount; msrCount++) { + columnValue = currentRow.getObject(dimCount + msrCount); + if (null != columnValue) { + dataType = measureDataTypes[msrCount]; + if (dataType == DataTypes.BOOLEAN) { + output.writeBoolean((boolean) columnValue); + } else if (dataType == DataTypes.SHORT) { + output.writeShort((short) columnValue); + } else if (dataType == DataTypes.INT) { + output.writeInt((int) columnValue); + } else if (dataType == DataTypes.LONG) { + output.writeLong((long) columnValue); + } else if (dataType == DataTypes.DOUBLE) { + output.writeDouble((double) columnValue); + } else if (DataTypes.isDecimal(dataType)) { + BigDecimal val = (BigDecimal) columnValue; + byte[] bigDecimalInBytes = DataTypeUtil.bigDecimalToByte(val); + output.writeShort(bigDecimalInBytes.length); + output.writeBytes(bigDecimalInBytes); + } else { + String msg = + "unsupported data type:" + dataFields[dimCount + msrCount].getColumn().getDataType() + .getName(); + LOGGER.error(msg); + throw new IOException(msg); + } + } + } + + if (output.isFull()) { + appendBlockletToDataFile(); + } + } + + private void writeFileHeader() throws IOException { + List<ColumnSchema> wrapperColumnSchemaList = CarbonUtil + .getColumnSchemaList(carbonTable.getDimensionByTableName(carbonTable.getTableName()), + carbonTable.getMeasureByTableName(carbonTable.getTableName())); + int[] dimLensWithComplex = new int[wrapperColumnSchemaList.size()]; + for (int i = 0; i < dimLensWithComplex.length; i++) { + dimLensWithComplex[i] = Integer.MAX_VALUE; + } + int[] dictionaryColumnCardinality = + CarbonUtil.getFormattedCardinality(dimLensWithComplex, wrapperColumnSchemaList); + List<Integer> cardinality = new ArrayList<>(); + List<org.apache.carbondata.format.ColumnSchema> columnSchemaList = AbstractFactDataWriter + .getColumnSchemaListAndCardinality(cardinality, dictionaryColumnCardinality, + wrapperColumnSchemaList); + FileHeader fileHeader = + CarbonMetadataUtil.getFileHeader(true, columnSchemaList, System.currentTimeMillis()); + fileHeader.setIs_footer_present(false); + fileHeader.setIs_splitable(true); + fileHeader.setSync_marker(CarbonStreamOutputFormat.CARBON_SYNC_MARKER); + outputStream.write(CarbonUtil.getByteArray(fileHeader)); + } + + /** + * write a blocklet to file + */ + private void appendBlockletToDataFile() throws IOException { + if (output.getRowIndex() == -1) { + return; + } + output.apppendBlocklet(outputStream); + outputStream.flush(); + // reset data + output.reset(); + } + + @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { + try { + // append remain buffer data + if (!hasException && !isFirstRow) { + appendBlockletToDataFile(); + converter.finish(); + } + } finally { + // close resource + CarbonUtil.closeStreams(outputStream); + if (output != null) { + output.close(); + } + if (badRecordLogger != null) { + badRecordLogger.closeStreams(); + } + } + } + + public String getSegmentDir() { + return segmentDir; + } + + public String getFileName() { + return fileName; + } + + public void setHasException(boolean hasException) { + this.hasException = hasException; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletReader.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletReader.java b/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletReader.java new file mode 100644 index 0000000..43fe6ed --- /dev/null +++ b/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletReader.java @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.streaming; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; + +import org.apache.carbondata.core.datastore.compression.Compressor; +import org.apache.carbondata.core.datastore.compression.CompressorFactory; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.format.BlockletHeader; + +/** + * stream blocklet reader + */ +public class StreamBlockletReader { + + private byte[] buffer; + private int offset; + private final byte[] syncMarker; + private final byte[] syncBuffer; + private final int syncLen; + private long pos = 0; + private final InputStream in; + private final long limitStart; + private final long limitEnd; + private boolean isAlreadySync = false; + private Compressor compressor = CompressorFactory.getInstance().getCompressor(); + private int rowNums = 0; + private int rowIndex = 0; + private boolean isHeaderPresent; + + StreamBlockletReader(byte[] syncMarker, InputStream in, long limit, boolean isHeaderPresent) { + this.syncMarker = syncMarker; + syncLen = syncMarker.length; + syncBuffer = new byte[syncLen]; + this.in = in; + limitStart = limit; + limitEnd = limitStart + syncLen; + this.isHeaderPresent = isHeaderPresent; + } + + private void ensureCapacity(int capacity) { + if (buffer == null || capacity > buffer.length) { + buffer = new byte[capacity]; + } + } + + /** + * find the first position of sync_marker in input stream + */ + private boolean sync() throws IOException { + if (!readBytesFromStream(syncBuffer, 0, syncLen)) { + return false; + } + boolean skipHeader = false; + for (int i = 0; i < limitStart; i++) { + int j = 0; + for (; j < syncLen; j++) { + if (syncMarker[j] != syncBuffer[(i + j) % syncLen]) break; + } + if (syncLen == j) { + if (isHeaderPresent) { + if (skipHeader) { + return true; + } else { + skipHeader = true; + } + } else { + return true; + } + } + int value = in.read(); + if (-1 == value) { + return false; + } + syncBuffer[i % syncLen] = (byte) value; + pos++; + } + return false; + } + + BlockletHeader readBlockletHeader() throws IOException { + int len = readIntFromStream(); + byte[] b = new byte[len]; + if (!readBytesFromStream(b, 0, len)) { + throw new EOFException("Failed to read blocklet header"); + } + BlockletHeader header = CarbonUtil.readBlockletHeader(b); + rowNums = header.getBlocklet_info().getNum_rows(); + rowIndex = 0; + return header; + } + + void readBlockletData(BlockletHeader header) throws IOException { + ensureCapacity(header.getBlocklet_length()); + offset = 0; + int len = readIntFromStream(); + byte[] b = new byte[len]; + if (!readBytesFromStream(b, 0, len)) { + throw new EOFException("Failed to read blocklet data"); + } + compressor.rawUncompress(b, buffer); + } + + void skipBlockletData(boolean reset) throws IOException { + int len = readIntFromStream(); + skip(len); + pos += len; + if (reset) { + this.rowNums = 0; + this.rowIndex = 0; + } + } + + private void skip(int len) throws IOException { + long remaining = len; + do { + long skipLen = in.skip(remaining); + remaining -= skipLen; + } while (remaining > 0); + } + + /** + * find the next blocklet + */ + boolean nextBlocklet() throws IOException { + if (pos >= limitStart) { + return false; + } + if (isAlreadySync) { + if (!readBytesFromStream(syncBuffer, 0, syncLen)) { + return false; + } + } else { + isAlreadySync = true; + if (!sync()) { + return false; + } + } + + return pos < limitEnd; + } + + boolean hasNext() throws IOException { + return rowIndex < rowNums; + } + + void nextRow() { + rowIndex++; + } + + int readIntFromStream() throws IOException { + int ch1 = in.read(); + int ch2 = in.read(); + int ch3 = in.read(); + int ch4 = in.read(); + if ((ch1 | ch2 | ch3 | ch4) < 0) throw new EOFException(); + pos += 4; + return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0)); + } + + /** + * Reads <code>len</code> bytes of data from the input stream into + * an array of bytes. + * @return <code>true</code> if reading data successfully, or + * <code>false</code> if there is no more data because the end of the stream has been reached. + */ + boolean readBytesFromStream(byte[] b, int offset, int len) throws IOException { + int readLen = in.read(b, offset, len); + if (readLen < 0) { + return false; + } + pos += readLen; + if (readLen < len) { + return readBytesFromStream(b, offset + readLen, len - readLen); + } else { + return true; + } + } + + boolean readBoolean() { + return (buffer[offset++]) != 0; + } + + short readShort() { + short v = (short) ((buffer[offset + 1] & 255) + + ((buffer[offset]) << 8)); + offset += 2; + return v; + } + + byte[] copy(int len) { + byte[] b = new byte[len]; + System.arraycopy(buffer, offset, b, 0, len); + return b; + } + + int readInt() { + int v = ((buffer[offset + 3] & 255) + + ((buffer[offset + 2] & 255) << 8) + + ((buffer[offset + 1] & 255) << 16) + + ((buffer[offset]) << 24)); + offset += 4; + return v; + } + + long readLong() { + long v = ((long)(buffer[offset + 7] & 255)) + + ((long) (buffer[offset + 6] & 255) << 8) + + ((long) (buffer[offset + 5] & 255) << 16) + + ((long) (buffer[offset + 4] & 255) << 24) + + ((long) (buffer[offset + 3] & 255) << 32) + + ((long) (buffer[offset + 2] & 255) << 40) + + ((long) (buffer[offset + 1] & 255) << 48) + + ((long) (buffer[offset]) << 56); + offset += 8; + return v; + } + + double readDouble() { + return Double.longBitsToDouble(readLong()); + } + + byte[] readBytes(int len) { + byte[] b = new byte[len]; + System.arraycopy(buffer, offset, b, 0, len); + offset += len; + return b; + } + + void skipBytes(int len) { + offset += len; + } + + int getRowNums() { + return rowNums; + } + + void close() { + CarbonUtil.closeStreams(in); + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletWriter.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletWriter.java b/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletWriter.java new file mode 100644 index 0000000..509e2aa --- /dev/null +++ b/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletWriter.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.streaming; + +import java.io.DataOutputStream; +import java.io.IOException; + +import org.apache.carbondata.core.datastore.compression.Compressor; +import org.apache.carbondata.core.datastore.compression.CompressorFactory; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.format.BlockletHeader; +import org.apache.carbondata.format.BlockletInfo; +import org.apache.carbondata.format.MutationType; + +/** + * stream blocklet writer + */ +public class StreamBlockletWriter { + private byte[] buffer; + private int maxSize; + private int maxRowNum; + private int rowSize; + private int count = 0; + private int rowIndex = -1; + private Compressor compressor = CompressorFactory.getInstance().getCompressor(); + + StreamBlockletWriter(int maxSize, int maxRowNum, int rowSize) { + buffer = new byte[maxSize]; + this.maxSize = maxSize; + this.maxRowNum = maxRowNum; + this.rowSize = rowSize; + } + + private void ensureCapacity(int space) { + int newcount = space + count; + if (newcount > buffer.length) { + byte[] newbuf = new byte[Math.max(newcount, buffer.length + rowSize)]; + System.arraycopy(buffer, 0, newbuf, 0, count); + buffer = newbuf; + } + } + + void reset() { + count = 0; + rowIndex = -1; + } + + byte[] getBytes() { + return buffer; + } + + int getCount() { + return count; + } + + int getRowIndex() { + return rowIndex; + } + + void nextRow() { + rowIndex++; + } + + boolean isFull() { + return rowIndex == maxRowNum || count >= maxSize; + } + + void writeBoolean(boolean val) { + ensureCapacity(1); + buffer[count] = (byte) (val ? 1 : 0); + count += 1; + } + + void writeShort(int val) { + ensureCapacity(2); + buffer[count + 1] = (byte) (val); + buffer[count] = (byte) (val >>> 8); + count += 2; + } + + void writeInt(int val) { + ensureCapacity(4); + buffer[count + 3] = (byte) (val); + buffer[count + 2] = (byte) (val >>> 8); + buffer[count + 1] = (byte) (val >>> 16); + buffer[count] = (byte) (val >>> 24); + count += 4; + } + + void writeLong(long val) { + ensureCapacity(8); + buffer[count + 7] = (byte) (val); + buffer[count + 6] = (byte) (val >>> 8); + buffer[count + 5] = (byte) (val >>> 16); + buffer[count + 4] = (byte) (val >>> 24); + buffer[count + 3] = (byte) (val >>> 32); + buffer[count + 2] = (byte) (val >>> 40); + buffer[count + 1] = (byte) (val >>> 48); + buffer[count] = (byte) (val >>> 56); + count += 8; + } + + void writeDouble(double val) { + writeLong(Double.doubleToLongBits(val)); + } + + void writeBytes(byte[] b) { + writeBytes(b, 0, b.length); + } + + void writeBytes(byte[] b, int off, int len) { + ensureCapacity(len); + System.arraycopy(b, off, buffer, count, len); + count += len; + } + + void apppendBlocklet(DataOutputStream outputStream) throws IOException { + outputStream.write(CarbonStreamOutputFormat.CARBON_SYNC_MARKER); + + BlockletInfo blockletInfo = new BlockletInfo(); + blockletInfo.setNum_rows(getRowIndex() + 1); + BlockletHeader blockletHeader = new BlockletHeader(); + blockletHeader.setBlocklet_length(getCount()); + blockletHeader.setMutation(MutationType.INSERT); + blockletHeader.setBlocklet_info(blockletInfo); + byte[] headerBytes = CarbonUtil.getByteArray(blockletHeader); + outputStream.writeInt(headerBytes.length); + outputStream.write(headerBytes); + + byte[] compressed = compressor.compressByte(getBytes(), getCount()); + outputStream.writeInt(compressed.length); + outputStream.write(compressed); + } + + void close() { + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java index 8c9889d..9e83924 100644 --- a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java +++ b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java @@ -42,8 +42,8 @@ import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.core.writer.CarbonIndexFileWriter; import org.apache.carbondata.format.BlockIndex; import org.apache.carbondata.format.BlockletIndex; -import org.apache.carbondata.hadoop.streaming.CarbonStreamRecordWriter; import org.apache.carbondata.processing.loading.model.CarbonLoadModel; +import org.apache.carbondata.streaming.CarbonStreamRecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/streaming/src/main/scala/org/apache/carbondata/streaming/CarbonSparkStreamingListener.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/CarbonSparkStreamingListener.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/CarbonSparkStreamingListener.scala deleted file mode 100644 index 6d1fa45..0000000 --- a/streaming/src/main/scala/org/apache/carbondata/streaming/CarbonSparkStreamingListener.scala +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.streaming - -import org.apache.spark.scheduler.SparkListener -import org.apache.spark.scheduler.SparkListenerApplicationEnd - -class CarbonSparkStreamingListener extends SparkListener { - - /** - * When Spark Streaming App stops, remove all locks for stream table. - */ - override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = { - CarbonStreamSparkStreaming.cleanAllLockAfterStop() - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/streaming/src/main/scala/org/apache/carbondata/streaming/CarbonStreamSparkStreaming.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/CarbonStreamSparkStreaming.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/CarbonStreamSparkStreaming.scala deleted file mode 100644 index 4aa1517..0000000 --- a/streaming/src/main/scala/org/apache/carbondata/streaming/CarbonStreamSparkStreaming.scala +++ /dev/null @@ -1,187 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.streaming - -import java.util - -import scala.collection.JavaConverters._ - -import org.apache.hadoop.conf.Configuration -import org.apache.spark.sql.DataFrame -import org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink -import org.apache.spark.sql.execution.streaming.Sink -import org.apache.spark.sql.SaveMode -import org.apache.spark.sql.SparkSession -import org.apache.spark.streaming.Time - -import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, LockUsage} -import org.apache.carbondata.core.metadata.schema.table.CarbonTable - -/** - * Interface used to write stream data to stream table - * when integrate with Spark Streaming. - * - * NOTE: Current integration with Spark Streaming is an alpha feature. - */ -class CarbonStreamSparkStreamingWriter(val sparkSession: SparkSession, - val carbonTable: CarbonTable, - val configuration: Configuration) { - - private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) - - private var isInitialize: Boolean = false - - private var lock: ICarbonLock = null - private var carbonAppendableStreamSink: Sink = null - - /** - * Acquired the lock for stream table - */ - def lockStreamTable(): Unit = { - lock = CarbonLockFactory.getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier, - LockUsage.STREAMING_LOCK) - if (lock.lockWithRetries()) { - LOGGER.info("Acquired the lock for stream table: " + - carbonTable.getDatabaseName + "." + - carbonTable.getTableName) - } else { - LOGGER.error("Not able to acquire the lock for stream table:" + - carbonTable.getDatabaseName + "." + carbonTable.getTableName) - throw new InterruptedException( - "Not able to acquire the lock for stream table: " + carbonTable.getDatabaseName + "." + - carbonTable.getTableName) - } - } - - /** - * unlock for stream table - */ - def unLockStreamTable(): Unit = { - if (null != lock) { - lock.unlock() - LOGGER.info("unlock for stream table: " + - carbonTable.getDatabaseName + "." + - carbonTable.getTableName) - } - } - - def initialize(): Unit = { - carbonAppendableStreamSink = StreamSinkFactory.createStreamTableSink( - sparkSession, - configuration, - carbonTable, - extraOptions.toMap).asInstanceOf[CarbonAppendableStreamSink] - - lockStreamTable() - - isInitialize = true - } - - def writeStreamData(dataFrame: DataFrame, time: Time): Unit = { - if (!isInitialize) { - initialize() - } - carbonAppendableStreamSink.addBatch(time.milliseconds, dataFrame) - } - - private val extraOptions = new scala.collection.mutable.HashMap[String, String] - private var mode: SaveMode = SaveMode.ErrorIfExists - - this.option("dbName", carbonTable.getDatabaseName) - this.option("tableName", carbonTable.getTableName) - - /** - * Specifies the behavior when data or table already exists. Options include: - * - `SaveMode.Overwrite`: overwrite the existing data. - * - `SaveMode.Append`: append the data. - * - `SaveMode.Ignore`: ignore the operation (i.e. no-op). - * - `SaveMode.ErrorIfExists`: default option, throw an exception at runtime. - */ - def mode(saveMode: SaveMode): CarbonStreamSparkStreamingWriter = { - if (mode == SaveMode.ErrorIfExists) { - mode = saveMode - } - this - } - - /** - * Specifies the behavior when data or table already exists. Options include: - * - `overwrite`: overwrite the existing data. - * - `append`: append the data. - * - `ignore`: ignore the operation (i.e. no-op). - * - `error or default`: default option, throw an exception at runtime. - */ - def mode(saveMode: String): CarbonStreamSparkStreamingWriter = { - if (mode == SaveMode.ErrorIfExists) { - mode = saveMode.toLowerCase(util.Locale.ROOT) match { - case "overwrite" => SaveMode.Overwrite - case "append" => SaveMode.Append - case "ignore" => SaveMode.Ignore - case "error" | "default" => SaveMode.ErrorIfExists - case _ => throw new IllegalArgumentException(s"Unknown save mode: $saveMode. " + - "Accepted save modes are 'overwrite', 'append', 'ignore', 'error'.") - } - } - this - } - - /** - * Adds an output option - */ - def option(key: String, value: String): CarbonStreamSparkStreamingWriter = { - if (!extraOptions.contains(key)) { - extraOptions += (key -> value) - } - this - } - - /** - * Adds an output option - */ - def option(key: String, value: Boolean): CarbonStreamSparkStreamingWriter = - option(key, value.toString) - - /** - * Adds an output option - */ - def option(key: String, value: Long): CarbonStreamSparkStreamingWriter = - option(key, value.toString) - - /** - * Adds an output option - */ - def option(key: String, value: Double): CarbonStreamSparkStreamingWriter = - option(key, value.toString) -} - -object CarbonStreamSparkStreaming { - - @transient private val tableMap = - new util.HashMap[String, CarbonStreamSparkStreamingWriter]() - - def getTableMap: util.Map[String, CarbonStreamSparkStreamingWriter] = tableMap - - /** - * remove all stream lock. - */ - def cleanAllLockAfterStop(): Unit = { - tableMap.asScala.values.foreach { writer => writer.unLockStreamTable() } - tableMap.clear() - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala deleted file mode 100644 index 4df04b9..0000000 --- a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala +++ /dev/null @@ -1,436 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.streaming - -import java.text.SimpleDateFormat -import java.util -import java.util.Date - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.mapreduce.{Job, TaskAttemptID, TaskType} -import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl -import org.apache.spark.{Partition, SerializableWritable, SparkContext, TaskContext} -import org.apache.spark.sql.catalyst.expressions.GenericInternalRow -import org.apache.spark.sql.SparkSession - -import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.core.datamap.Segment -import org.apache.carbondata.core.datastore.block.SegmentProperties -import org.apache.carbondata.core.datastore.impl.FileFactory -import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage} -import org.apache.carbondata.core.metadata.CarbonMetadata -import org.apache.carbondata.core.metadata.schema.table.CarbonTable -import org.apache.carbondata.core.scan.result.iterator.RawResultIterator -import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager} -import org.apache.carbondata.core.util.CarbonUtil -import org.apache.carbondata.core.util.path.CarbonTablePath -import org.apache.carbondata.events.{OperationContext, OperationListenerBus} -import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonProjection} -import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat} -import org.apache.carbondata.hadoop.streaming.{CarbonStreamInputFormat, CarbonStreamRecordReader} -import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePostStatusUpdateEvent, LoadTablePreStatusUpdateEvent} -import org.apache.carbondata.processing.loading.model.CarbonLoadModel -import org.apache.carbondata.processing.merger.{CompactionResultSortProcessor, CompactionType} -import org.apache.carbondata.processing.util.CarbonLoaderUtil -import org.apache.carbondata.spark.{HandoffResult, HandoffResultImpl} -import org.apache.carbondata.spark.rdd.CarbonRDD -import org.apache.carbondata.spark.util.CommonUtil - - -/** - * partition of the handoff segment - */ -class HandoffPartition( - val rddId: Int, - val idx: Int, - @transient val inputSplit: CarbonInputSplit -) extends Partition { - - val split = new SerializableWritable[CarbonInputSplit](inputSplit) - - override val index: Int = idx - - override def hashCode(): Int = 41 * (41 + rddId) + idx -} - -/** - * package the record reader of the handoff segment to RawResultIterator - */ -class StreamingRawResultIterator( - recordReader: CarbonStreamRecordReader -) extends RawResultIterator(null, null, null) { - - override def hasNext: Boolean = { - recordReader.nextKeyValue() - } - - override def next(): Array[Object] = { - val rowTmp = recordReader - .getCurrentValue - .asInstanceOf[GenericInternalRow] - .values - .asInstanceOf[Array[Object]] - val row = new Array[Object](rowTmp.length) - System.arraycopy(rowTmp, 0, row, 0, rowTmp.length) - row - } -} - -/** - * execute streaming segment handoff - */ -class StreamHandoffRDD[K, V]( - sc: SparkContext, - result: HandoffResult[K, V], - carbonLoadModel: CarbonLoadModel, - handOffSegmentId: String -) extends CarbonRDD[(K, V)](sc, Nil, sc.hadoopConfiguration) { - - private val jobTrackerId: String = { - val formatter = new SimpleDateFormat("yyyyMMddHHmm") - formatter.format(new Date()) - } - - override def internalCompute( - split: Partition, - context: TaskContext - ): Iterator[(K, V)] = { - carbonLoadModel.setTaskNo("" + split.index) - val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable - CarbonMetadata.getInstance().addCarbonTable(carbonTable) - // the input iterator is using raw row - val iteratorList = prepareInputIterator(split, carbonTable) - - CommonUtil.setTempStoreLocation(split.index, carbonLoadModel, true, false) - // use CompactionResultSortProcessor to sort data dan write to columnar files - val processor = prepareHandoffProcessor(carbonTable) - val status = processor.execute(iteratorList) - - new Iterator[(K, V)] { - private var finished = false - - override def hasNext: Boolean = { - !finished - } - - override def next(): (K, V) = { - finished = true - result.getKey("" + split.index, status) - } - } - } - - /** - * prepare input iterator by basing CarbonStreamRecordReader - */ - private def prepareInputIterator( - split: Partition, - carbonTable: CarbonTable - ): util.ArrayList[RawResultIterator] = { - val inputSplit = split.asInstanceOf[HandoffPartition].split.value - val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0) - val hadoopConf = new Configuration() - CarbonInputFormat.setDatabaseName(hadoopConf, carbonTable.getDatabaseName) - CarbonInputFormat.setTableName(hadoopConf, carbonTable.getTableName) - CarbonInputFormat.setTablePath(hadoopConf, carbonTable.getTablePath) - val projection = new CarbonProjection - val dataFields = carbonTable.getStreamStorageOrderColumn(carbonTable.getTableName) - (0 until dataFields.size()).foreach { index => - projection.addColumn(dataFields.get(index).getColName) - } - CarbonInputFormat.setColumnProjection(hadoopConf, projection) - CarbonInputFormat.setTableInfo(hadoopConf, carbonTable.getTableInfo) - val attemptContext = new TaskAttemptContextImpl(hadoopConf, attemptId) - val format = new CarbonTableInputFormat[Array[Object]]() - val model = format.createQueryModel(inputSplit, attemptContext) - val inputFormat = new CarbonStreamInputFormat - val streamReader = inputFormat.createRecordReader(inputSplit, attemptContext) - .asInstanceOf[CarbonStreamRecordReader] - streamReader.setVectorReader(false) - streamReader.setQueryModel(model) - streamReader.setUseRawRow(true) - streamReader.initialize(inputSplit, attemptContext) - val iteratorList = new util.ArrayList[RawResultIterator](1) - iteratorList.add(new StreamingRawResultIterator(streamReader)) - iteratorList - } - - private def prepareHandoffProcessor( - carbonTable: CarbonTable - ): CompactionResultSortProcessor = { - val wrapperColumnSchemaList = CarbonUtil.getColumnSchemaList( - carbonTable.getDimensionByTableName(carbonTable.getTableName), - carbonTable.getMeasureByTableName(carbonTable.getTableName)) - val dimLensWithComplex = - (0 until wrapperColumnSchemaList.size()).map(_ => Integer.MAX_VALUE).toArray - val dictionaryColumnCardinality = - CarbonUtil.getFormattedCardinality(dimLensWithComplex, wrapperColumnSchemaList) - val segmentProperties = - new SegmentProperties(wrapperColumnSchemaList, dictionaryColumnCardinality) - - new CompactionResultSortProcessor( - carbonLoadModel, - carbonTable, - segmentProperties, - CompactionType.STREAMING, - carbonTable.getTableName, - null - ) - } - - /** - * get the partitions of the handoff segment - */ - override protected def getPartitions: Array[Partition] = { - val job = Job.getInstance(FileFactory.getConfiguration) - val inputFormat = new CarbonTableInputFormat[Array[Object]]() - val segmentList = new util.ArrayList[Segment](1) - segmentList.add(Segment.toSegment(handOffSegmentId)) - val splits = inputFormat.getSplitsOfStreaming( - job, - carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getAbsoluteTableIdentifier, - segmentList - ) - - (0 until splits.size()).map { index => - new HandoffPartition(id, index, splits.get(index).asInstanceOf[CarbonInputSplit]) - }.toArray[Partition] - } -} - -object StreamHandoffRDD { - - private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) - - def iterateStreamingHandoff( - carbonLoadModel: CarbonLoadModel, - sparkSession: SparkSession - ): Unit = { - val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable - val identifier = carbonTable.getAbsoluteTableIdentifier - var continueHandoff = false - // require handoff lock on table - val lock = CarbonLockFactory.getCarbonLockObj(identifier, LockUsage.HANDOFF_LOCK) - try { - if (lock.lockWithRetries()) { - LOGGER.info("Acquired the handoff lock for table" + - s" ${ carbonTable.getDatabaseName }.${ carbonTable.getTableName }") - // handoff streaming segment one by one - do { - val segmentStatusManager = new SegmentStatusManager(identifier) - var loadMetadataDetails: Array[LoadMetadataDetails] = null - // lock table to read table status file - val statusLock = segmentStatusManager.getTableStatusLock - try { - if (statusLock.lockWithRetries()) { - loadMetadataDetails = SegmentStatusManager.readLoadMetadata( - CarbonTablePath.getMetadataPath(identifier.getTablePath)) - } - } finally { - if (null != statusLock) { - statusLock.unlock() - } - } - if (null != loadMetadataDetails) { - val streamSegments = - loadMetadataDetails.filter(_.getSegmentStatus == SegmentStatus.STREAMING_FINISH) - - continueHandoff = streamSegments.length > 0 - if (continueHandoff) { - // handoff a streaming segment - val loadMetadataDetail = streamSegments(0) - executeStreamingHandoff( - carbonLoadModel, - sparkSession, - loadMetadataDetail.getLoadName - ) - } - } else { - continueHandoff = false - } - } while (continueHandoff) - } - } finally { - if (null != lock) { - lock.unlock() - } - } - } - - /** - * start new thread to execute stream segment handoff - */ - def startStreamingHandoffThread( - carbonLoadModel: CarbonLoadModel, - sparkSession: SparkSession, - isDDL: Boolean - ): Unit = { - if (isDDL) { - iterateStreamingHandoff(carbonLoadModel, sparkSession) - } else { - // start a new thread to execute streaming segment handoff - val handoffThread = new Thread() { - override def run(): Unit = { - iterateStreamingHandoff(carbonLoadModel, sparkSession) - } - } - handoffThread.start() - } - } - - /** - * invoke StreamHandoffRDD to handoff a streaming segment to a columnar segment - */ - def executeStreamingHandoff( - carbonLoadModel: CarbonLoadModel, - sparkSession: SparkSession, - handoffSegmenId: String - ): Unit = { - var loadStatus = SegmentStatus.SUCCESS - var errorMessage: String = "Handoff failure" - try { - // generate new columnar segment - val newMetaEntry = new LoadMetadataDetails - carbonLoadModel.setFactTimeStamp(System.currentTimeMillis()) - CarbonLoaderUtil.populateNewLoadMetaEntry( - newMetaEntry, - SegmentStatus.INSERT_IN_PROGRESS, - carbonLoadModel.getFactTimeStamp, - false) - val operationContext = new OperationContext() - val loadTablePreStatusUpdateEvent: LoadTablePreStatusUpdateEvent = - new LoadTablePreStatusUpdateEvent( - carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getCarbonTableIdentifier, - carbonLoadModel) - OperationListenerBus.getInstance().fireEvent(loadTablePreStatusUpdateEvent, operationContext) - - CarbonLoaderUtil.recordNewLoadMetadata(newMetaEntry, carbonLoadModel, true, false) - val loadTablePostStatusUpdateEvent: LoadTablePostStatusUpdateEvent = - new LoadTablePostStatusUpdateEvent(carbonLoadModel) - OperationListenerBus.getInstance() - .fireEvent(loadTablePostStatusUpdateEvent, operationContext) - // convert a streaming segment to columnar segment - val status = new StreamHandoffRDD( - sparkSession.sparkContext, - new HandoffResultImpl(), - carbonLoadModel, - handoffSegmenId).collect() - - status.foreach { x => - if (!x._2) { - loadStatus = SegmentStatus.LOAD_FAILURE - } - } - } catch { - case ex: Exception => - loadStatus = SegmentStatus.LOAD_FAILURE - errorMessage = errorMessage + ": " + ex.getCause.getMessage - LOGGER.error(errorMessage) - LOGGER.error(ex, s"Handoff failed on streaming segment $handoffSegmenId") - } - - if (loadStatus == SegmentStatus.LOAD_FAILURE) { - CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel) - LOGGER.info("********starting clean up**********") - CarbonLoaderUtil.deleteSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt) - LOGGER.info("********clean up done**********") - LOGGER.audit(s"Handoff is failed for " + - s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") - LOGGER.error("Cannot write load metadata file as handoff failed") - throw new Exception(errorMessage) - } - - if (loadStatus == SegmentStatus.SUCCESS) { - val done = updateLoadMetadata(handoffSegmenId, carbonLoadModel) - if (!done) { - val errorMessage = "Handoff failed due to failure in table status updation." - LOGGER.audit("Handoff is failed for " + - s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") - LOGGER.error("Handoff failed due to failure in table status updation.") - throw new Exception(errorMessage) - } - done - } - - } - - /** - * update streaming segment and new columnar segment - */ - private def updateLoadMetadata( - handoffSegmentId: String, - loadModel: CarbonLoadModel - ): Boolean = { - var status = false - val metaDataFilepath = loadModel.getCarbonDataLoadSchema.getCarbonTable.getMetadataPath - val identifier = loadModel.getCarbonDataLoadSchema.getCarbonTable.getAbsoluteTableIdentifier - val metadataPath = CarbonTablePath.getMetadataPath(identifier.getTablePath) - val fileType = FileFactory.getFileType(metadataPath) - if (!FileFactory.isFileExist(metadataPath, fileType)) { - FileFactory.mkdirs(metadataPath, fileType) - } - val tableStatusPath = CarbonTablePath.getTableStatusFilePath(identifier.getTablePath) - val segmentStatusManager = new SegmentStatusManager(identifier) - val carbonLock = segmentStatusManager.getTableStatusLock - try { - if (carbonLock.lockWithRetries()) { - LOGGER.info( - "Acquired lock for table" + loadModel.getDatabaseName() + "." + loadModel.getTableName() - + " for table status updation") - val listOfLoadFolderDetailsArray = - SegmentStatusManager.readLoadMetadata(metaDataFilepath) - - // update new columnar segment to success status - val newSegment = - listOfLoadFolderDetailsArray.find(_.getLoadName.equals(loadModel.getSegmentId)) - if (newSegment.isEmpty) { - throw new Exception("Failed to update table status for new segment") - } else { - newSegment.get.setSegmentStatus(SegmentStatus.SUCCESS) - newSegment.get.setLoadEndTime(System.currentTimeMillis()) - } - - // update streaming segment to compacted status - val streamSegment = - listOfLoadFolderDetailsArray.find(_.getLoadName.equals(handoffSegmentId)) - if (streamSegment.isEmpty) { - throw new Exception("Failed to update table status for streaming segment") - } else { - streamSegment.get.setSegmentStatus(SegmentStatus.COMPACTED) - streamSegment.get.setMergedLoadName(loadModel.getSegmentId) - } - - // refresh table status file - SegmentStatusManager.writeLoadDetailsIntoFile(tableStatusPath, listOfLoadFolderDetailsArray) - status = true - } else { - LOGGER.error("Not able to acquire the lock for Table status updation for table " + loadModel - .getDatabaseName() + "." + loadModel.getTableName()) - } - } finally { - if (carbonLock.unlock()) { - LOGGER.info("Table unlocked successfully after table status updation" + - loadModel.getDatabaseName() + "." + loadModel.getTableName()) - } else { - LOGGER.error("Unable to unlock Table lock for table" + loadModel.getDatabaseName() + - "." + loadModel.getTableName() + " during table status updation") - } - } - status - } -}
