This is an automated email from the ASF dual-hosted git repository. manhua pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push: new 1cec1ee [CARBONDATA-3889] Cleanup code for carbondata-hadoop module 1cec1ee is described below commit 1cec1ee7268cc18a98d8894119b38dc8df60644a Author: QiangCai <qiang...@qq.com> AuthorDate: Tue Jul 7 15:48:10 2020 +0800 [CARBONDATA-3889] Cleanup code for carbondata-hadoop module Why is this PR needed? need cleanup code for carbondata-hadoop module What changes were proposed in this PR? Cleanup code for carbondata-hadoop module Does this PR introduce any user interface change? No Is any new testcase added? No This closes #3827 --- .../carbondata/hadoop/AbstractRecordReader.java | 4 +- .../carbondata/hadoop/CarbonMultiBlockSplit.java | 4 +- .../apache/carbondata/hadoop/CarbonProjection.java | 7 +- .../carbondata/hadoop/CarbonRecordReader.java | 6 +- .../hadoop/api/CarbonFileInputFormat.java | 58 +++----- .../carbondata/hadoop/api/CarbonInputFormat.java | 71 ++++------ .../hadoop/api/CarbonOutputCommitter.java | 33 ++--- .../hadoop/api/CarbonTableInputFormat.java | 25 +--- .../hadoop/api/CarbonTableOutputFormat.java | 65 ++++----- .../hadoop/stream/CarbonStreamInputFormat.java | 4 +- .../hadoop/stream/CarbonStreamUtils.java | 4 +- .../hadoop/stream/StreamBlockletReader.java | 6 +- .../hadoop/stream/StreamRecordReader.java | 26 ++-- .../carbondata/hadoop/testutil/StoreCreator.java | 150 +++++---------------- .../hadoop/util/CarbonInputFormatUtil.java | 31 +---- .../hadoop/util/CarbonInputSplitTaskInfo.java | 40 +++--- .../hadoop/util/CarbonVectorizedRecordReader.java | 12 +- .../carbondata/spark/rdd/CarbonScanRDD.scala | 2 +- .../datasources/SparkCarbonTableFormat.scala | 2 +- 19 files changed, 176 insertions(+), 374 deletions(-) diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/AbstractRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/AbstractRecordReader.java index 5923ab7..15898d3 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/AbstractRecordReader.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/AbstractRecordReader.java @@ -31,9 +31,7 @@ public abstract class AbstractRecordReader<T> extends RecordReader<Void, T> { protected int rowCount = 0; /** - * This method will log query result count and querytime - * @param recordCount - * @param recorder + * This method will log query result count and query time */ public void logStatistics(int recordCount, QueryStatisticsRecorder recorder) { // result size diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java index 64901cf..f6a11a2 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java @@ -139,8 +139,8 @@ public class CarbonMultiBlockSplit extends InputSplit implements Serializable, W } getLocationIfNull(); out.writeInt(locations.length); - for (int i = 0; i < locations.length; i++) { - out.writeUTF(locations[i]); + for (String location : locations) { + out.writeUTF(location); } out.writeInt(fileFormat.ordinal()); } diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonProjection.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonProjection.java index 13ccaa9..12c03c0 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonProjection.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonProjection.java @@ -18,6 +18,7 @@ package org.apache.carbondata.hadoop; import java.io.Serializable; +import java.util.Arrays; import java.util.LinkedHashSet; import java.util.Objects; import java.util.Set; @@ -29,16 +30,14 @@ public class CarbonProjection implements Serializable { private static final long serialVersionUID = -4328676723039530713L; - private Set<String> columns = new LinkedHashSet<>(); + private final Set<String> columns = new LinkedHashSet<>(); public CarbonProjection() { } public CarbonProjection(String[] columnNames) { Objects.requireNonNull(columnNames); - for (String columnName : columnNames) { - columns.add(columnName); - } + columns.addAll(Arrays.asList(columnNames)); } public void addColumn(String column) { diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java index 943bd76..2c5c821 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java @@ -43,13 +43,13 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; */ public class CarbonRecordReader<T> extends AbstractRecordReader<T> { - protected QueryModel queryModel; + protected final QueryModel queryModel; - protected CarbonReadSupport<T> readSupport; + protected final CarbonReadSupport<T> readSupport; protected CarbonIterator<Object[]> carbonIterator; - protected QueryExecutor queryExecutor; + protected final QueryExecutor queryExecutor; private InputMetricsStats inputMetricsStats; /** diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java index 5568a29..9c599ef 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java @@ -20,7 +20,6 @@ package org.apache.carbondata.hadoop.api; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; -import java.util.Collections; import java.util.Comparator; import java.util.LinkedList; import java.util.List; @@ -28,7 +27,6 @@ import java.util.List; import org.apache.carbondata.common.annotations.InterfaceAudience; import org.apache.carbondata.common.annotations.InterfaceStability; import org.apache.carbondata.core.datastore.filesystem.CarbonFile; -import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter; import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.index.IndexFilter; import org.apache.carbondata.core.index.Segment; @@ -94,12 +92,9 @@ public class CarbonFileInputFormat<T> extends CarbonInputFormat<T> implements Se } /** - * {@inheritDoc} - * Configurations FileInputFormat.INPUT_DIR - * are used to get table path to read. - * - * @param job - * @return List<InputSplit> list of CarbonInputSplit + * get list of block/blocklet and make them to CarbonInputSplit + * @param job JobContext with Configuration + * @return list of CarbonInputSplit * @throws IOException */ @Override @@ -114,7 +109,7 @@ public class CarbonFileInputFormat<T> extends CarbonInputFormat<T> implements Se // check for externalTable segment (Segment_null) // process and resolve the expression - ReadCommittedScope readCommittedScope = null; + ReadCommittedScope readCommittedScope; if (carbonTable.isTransactionalTable()) { readCommittedScope = new LatestFilesReadCommittedScope( identifier.getTablePath() + "/Fact/Part0/Segment_null/", job.getConfiguration()); @@ -131,7 +126,7 @@ public class CarbonFileInputFormat<T> extends CarbonInputFormat<T> implements Se IndexFilter filter = getFilterPredicates(job.getConfiguration()); // if external table Segments are found, add it to the List - List<Segment> externalTableSegments = new ArrayList<Segment>(); + List<Segment> externalTableSegments = new ArrayList<>(); Segment seg; if (carbonTable.isTransactionalTable()) { // SDK some cases write into the Segment Path instead of Table Path i.e. inside @@ -145,9 +140,9 @@ public class CarbonFileInputFormat<T> extends CarbonInputFormat<T> implements Se for (LoadMetadataDetails load : loadMetadataDetails) { seg = new Segment(load.getLoadName(), null, readCommittedScope); if (fileLists != null) { - for (int i = 0; i < fileLists.size(); i++) { + for (Object fileList : fileLists) { String timestamp = - CarbonTablePath.DataFileUtil.getTimeStampFromFileName(fileLists.get(i).toString()); + CarbonTablePath.DataFileUtil.getTimeStampFromFileName(fileList.toString()); if (timestamp.equals(seg.getSegmentNo())) { externalTableSegments.add(seg); break; @@ -170,7 +165,7 @@ public class CarbonFileInputFormat<T> extends CarbonInputFormat<T> implements Se // do block filtering and get split splits = getSplits(job, filter, externalTableSegments); } else { - List<CarbonFile> carbonFiles = null; + List<CarbonFile> carbonFiles; if (null != this.fileLists) { carbonFiles = getAllCarbonDataFiles(this.fileLists); } else { @@ -191,13 +186,7 @@ public class CarbonFileInputFormat<T> extends CarbonInputFormat<T> implements Se info.setUseMinMaxForPruning(false); splits.add(split); } - Collections.sort(splits, new Comparator<InputSplit>() { - @Override - public int compare(InputSplit o1, InputSplit o2) { - return ((CarbonInputSplit) o1).getFilePath() - .compareTo(((CarbonInputSplit) o2).getFilePath()); - } - }); + splits.sort(Comparator.comparing(o -> ((CarbonInputSplit) o).getFilePath())); } setAllColumnProjectionIfNotConfigured(job, carbonTable); return splits; @@ -215,12 +204,8 @@ public class CarbonFileInputFormat<T> extends CarbonInputFormat<T> implements Se private List<CarbonFile> getAllCarbonDataFiles(String tablePath) { List<CarbonFile> carbonFiles; try { - carbonFiles = FileFactory.getCarbonFile(tablePath).listFiles(true, new CarbonFileFilter() { - @Override - public boolean accept(CarbonFile file) { - return file.getName().endsWith(CarbonTablePath.CARBON_DATA_EXT); - } - }); + carbonFiles = FileFactory.getCarbonFile(tablePath).listFiles(true, + file -> file.getName().endsWith(CarbonTablePath.CARBON_DATA_EXT)); } catch (IOException e) { throw new RuntimeException(e); } @@ -228,10 +213,10 @@ public class CarbonFileInputFormat<T> extends CarbonInputFormat<T> implements Se } private List<CarbonFile> getAllCarbonDataFiles(List fileLists) { - List<CarbonFile> carbonFiles = new LinkedList<CarbonFile>(); + List<CarbonFile> carbonFiles = new LinkedList<>(); try { - for (int i = 0; i < fileLists.size(); i++) { - carbonFiles.add(FileFactory.getCarbonFile(fileLists.get(i).toString())); + for (Object fileList : fileLists) { + carbonFiles.add(FileFactory.getCarbonFile(fileList.toString())); } } catch (Exception e) { throw new RuntimeException(e); @@ -247,20 +232,13 @@ public class CarbonFileInputFormat<T> extends CarbonInputFormat<T> implements Se * @return * @throws IOException */ - private List<InputSplit> getSplits( - JobContext job, - IndexFilter indexFilter, + private List<InputSplit> getSplits(JobContext job, IndexFilter indexFilter, List<Segment> validSegments) throws IOException { - numSegments = validSegments.size(); - List<InputSplit> result = new LinkedList<InputSplit>(); - // for each segment fetch blocks matching filter in Driver BTree - List<CarbonInputSplit> dataBlocksOfSegment = - getDataBlocksOfSegment(job, carbonTable, indexFilter, validSegments, - new ArrayList<Segment>(), new ArrayList<String>()); + List<CarbonInputSplit> dataBlocksOfSegment = getDataBlocksOfSegment(job, carbonTable, + indexFilter, validSegments, new ArrayList<>(), new ArrayList<>()); numBlocks = dataBlocksOfSegment.size(); - result.addAll(dataBlocksOfSegment); - return result; + return new LinkedList<>(dataBlocksOfSegment); } } diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java index 5c9e5e1..130e0d9 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java @@ -126,7 +126,7 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> { protected int numSegments = 0; protected int numStreamSegments = 0; protected int numStreamFiles = 0; - protected int hitedStreamFiles = 0; + protected int hitStreamFiles = 0; protected int numBlocks = 0; protected List fileLists = null; @@ -144,8 +144,8 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> { return numStreamFiles; } - public int getHitedStreamFiles() { - return hitedStreamFiles; + public int getHitStreamFiles() { + return hitStreamFiles; } public int getNumBlocks() { @@ -259,24 +259,13 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> { if (projection == null || projection.isEmpty()) { return; } - String[] allColumns = projection.getAllColumns(); - StringBuilder builder = new StringBuilder(); - for (String column : allColumns) { - builder.append(column).append(","); - } - String columnString = builder.toString(); - columnString = columnString.substring(0, columnString.length() - 1); - configuration.set(COLUMN_PROJECTION, columnString); + setColumnProjection(configuration, projection.getAllColumns()); } public static String getColumnProjection(Configuration configuration) { return configuration.get(COLUMN_PROJECTION); } - public static void setFgIndexPruning(Configuration configuration, boolean enable) { - configuration.set(FG_INDEX_PRUNING, String.valueOf(enable)); - } - public static boolean isFgIndexPruningEnable(Configuration configuration) { String enable = configuration.get(FG_INDEX_PRUNING); @@ -390,12 +379,9 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> { } /** - * {@inheritDoc} - * Configurations FileInputFormat.INPUT_DIR - * are used to get table path to read. - * - * @param job - * @return List<InputSplit> list of CarbonInputSplit + * get list of block/blocklet and make them to CarbonInputSplit + * @param job JobContext with Configuration + * @return list of CarbonInputSplit * @throws IOException */ @Override @@ -409,7 +395,7 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> { Long getDistributedCount(CarbonTable table, List<PartitionSpec> partitionNames, List<Segment> validSegments) { IndexInputFormat indexInputFormat = - new IndexInputFormat(table, null, validSegments, new ArrayList<String>(), + new IndexInputFormat(table, null, validSegments, new ArrayList<>(), partitionNames, false, null, false, false); indexInputFormat.setIsWriteToFile(false); try { @@ -497,7 +483,7 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> { // matchedPartitions variable will be null in two cases as follows // 1. the table is not a partition table // 2. the table is a partition table, and all partitions are matched by query - // for partition table, the task id of carbaondata file name is the partition id. + // for partition table, the task id of carbondata file name is the partition id. // if this partition is not required, here will skip it. resultFilteredBlocks.add(blocklet.getInputSplit()); } @@ -512,11 +498,11 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> { * get number of block by counting distinct file path of blocklets */ private int getBlockCount(List<ExtendedBlocklet> blocklets) { - Set<String> filepaths = new HashSet<>(); + Set<String> filePaths = new HashSet<>(); for (ExtendedBlocklet blocklet: blocklets) { - filepaths.add(blocklet.getPath()); + filePaths.add(blocklet.getPath()); } - return filepaths.size(); + return filePaths.size(); } /** @@ -537,7 +523,7 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> { List<PartitionSpec> partitionsToPrune = getPartitionsToPrune(job.getConfiguration()); // First prune using default index on driver side. TableIndex defaultIndex = IndexStoreManager.getInstance().getDefaultIndex(carbonTable); - List<ExtendedBlocklet> prunedBlocklets = null; + List<ExtendedBlocklet> prunedBlocklets; // This is to log the event, so user will know what is happening by seeing logs. LOG.info("Started block pruning ..."); boolean isDistributedPruningEnabled = CarbonProperties.getInstance() @@ -573,7 +559,7 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> { IndexChooser chooser = new IndexChooser(getOrCreateCarbonTable(job.getConfiguration())); - // Get the available CG indexs and prune further. + // Get the available CG indexes and prune further. IndexExprWrapper cgIndexExprWrapper = chooser.chooseCGIndex(filter.getResolver()); if (cgIndexExprWrapper != null) { @@ -586,7 +572,7 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> { if (distributedCG && indexJob != null) { cgPrunedBlocklets = IndexUtil .executeIndexJob(carbonTable, filter.getResolver(), indexJob, partitionsToPrune, - segmentIds, invalidSegments, IndexLevel.CG, new ArrayList<String>()); + segmentIds, invalidSegments, IndexLevel.CG, new ArrayList<>()); } else { cgPrunedBlocklets = cgIndexExprWrapper.prune(segmentIds, partitionsToPrune); } @@ -620,10 +606,9 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> { // Prune segments from already pruned blocklets IndexUtil.pruneSegments(segmentIds, prunedBlocklets); // Prune segments from already pruned blocklets - fgPrunedBlocklets = IndexUtil - .executeIndexJob(carbonTable, filter.getResolver(), indexJob, partitionsToPrune, - segmentIds, invalidSegments, fgIndexExprWrapper.getIndexLevel(), - new ArrayList<String>()); + fgPrunedBlocklets = IndexUtil.executeIndexJob( + carbonTable, filter.getResolver(), indexJob, partitionsToPrune, segmentIds, + invalidSegments, fgIndexExprWrapper.getIndexLevel(), new ArrayList<>()); // note that the 'fgPrunedBlocklets' has extra index related info compared with // 'prunedBlocklets', so the intersection should keep the elements in 'fgPrunedBlocklets' prunedBlocklets = @@ -641,7 +626,7 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> { private List<ExtendedBlocklet> intersectFilteredBlocklets(CarbonTable carbonTable, List<ExtendedBlocklet> previousIndexPrunedBlocklets, List<ExtendedBlocklet> otherIndexPrunedBlocklets) { - List<ExtendedBlocklet> prunedBlocklets = null; + List<ExtendedBlocklet> prunedBlocklets; if (BlockletIndexUtil.isCacheLevelBlock(carbonTable)) { prunedBlocklets = new ArrayList<>(); for (ExtendedBlocklet otherBlocklet : otherIndexPrunedBlocklets) { @@ -705,18 +690,15 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> { .dataConverter(getDataTypeConverter(configuration)) .build(); String readDeltaOnly = configuration.get(READ_ONLY_DELTA); - if (readDeltaOnly != null && Boolean.parseBoolean(readDeltaOnly)) { + if (Boolean.parseBoolean(readDeltaOnly)) { queryModel.setReadOnlyDelta(true); } return queryModel; } /** - * This method will create an Implict Expression and set it as right child in the given + * This method will create an Implicit Expression and set it as right child in the given * expression - * - * @param expression - * @param inputSplit */ private void checkAndAddImplicitExpression(Expression expression, InputSplit inputSplit) { if (inputSplit instanceof CarbonMultiBlockSplit) { @@ -793,7 +775,7 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> { /** * It is optional, if user does not set then it reads from store * - * @param configuration + * @param configuration hadoop configuration * @param converterClass is the Data type converter for different computing engine */ public static void setDataTypeConverter( @@ -825,11 +807,11 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> { public static String getDatabaseName(Configuration configuration) throws InvalidConfigurationException { - String databseName = configuration.get(DATABASE_NAME); - if (null == databseName) { + String databaseName = configuration.get(DATABASE_NAME); + if (null == databaseName) { throw new InvalidConfigurationException("Database name is not set."); } - return databseName; + return databaseName; } public static void setTableName(Configuration configuration, String tableName) { @@ -850,8 +832,7 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> { /** * Project all Columns for carbon reader * - * @return String araay of columnNames - * @param carbonTable + * @return String array of columnNames */ public String[] projectAllColumns(CarbonTable carbonTable) { List<ColumnSchema> colList = carbonTable.getTableInfo().getFactTable().getListOfColumns(); diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java index 6aa3067..4fd754b 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java @@ -59,7 +59,7 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.log4j.Logger; /** - * Outputcommitter which manages the segments during loading.It commits segment information to the + * OutputCommitter which manages the segments during loading.It commits segment information to the * tablestatus file upon success or fail. */ public class CarbonOutputCommitter extends FileOutputCommitter { @@ -74,10 +74,7 @@ public class CarbonOutputCommitter extends FileOutputCommitter { } /** - * Update the tablestatus with inprogress while setup the job. - * - * @param context - * @throws IOException + * Update the tablestatus with in-progress while setup the job. */ @Override public void setupJob(JobContext context) throws IOException { @@ -104,9 +101,6 @@ public class CarbonOutputCommitter extends FileOutputCommitter { /** * Update the tablestatus as success after job is success - * - * @param context - * @throws IOException */ @Override public void commitJob(JobContext context) throws IOException { @@ -230,7 +224,7 @@ public class CarbonOutputCommitter extends FileOutputCommitter { } } String updateTime = - context.getConfiguration().get(CarbonTableOutputFormat.UPADTE_TIMESTAMP, null); + context.getConfiguration().get(CarbonTableOutputFormat.UPDATE_TIMESTAMP, null); String segmentsToBeDeleted = context.getConfiguration().get(CarbonTableOutputFormat.SEGMENTS_TO_BE_DELETED, ""); List<Segment> segmentDeleteList = Segment.toSegmentList(segmentsToBeDeleted.split(","), null); @@ -327,10 +321,6 @@ public class CarbonOutputCommitter extends FileOutputCommitter { /** * Overwrite the partitions in case of overwrite query. It just updates the partition map files * of all segment files. - * - * @param loadModel - * @return - * @throws IOException */ private String overwritePartitions(CarbonLoadModel loadModel, LoadMetadataDetails newMetaEntry, String uuid) throws IOException { @@ -345,19 +335,18 @@ public class CarbonOutputCommitter extends FileOutputCommitter { new SegmentStatusManager(table.getAbsoluteTableIdentifier()) .getValidAndInvalidSegments(table.isMV()).getValidSegments(); String uniqueId = String.valueOf(System.currentTimeMillis()); - List<String> tobeUpdatedSegs = new ArrayList<>(); - List<String> tobeDeletedSegs = new ArrayList<>(); + List<String> toBeUpdatedSegments = new ArrayList<>(); + List<String> toBeDeletedSegments = new ArrayList<>(); // First drop the partitions from partition mapper files of each segment for (Segment segment : validSegments) { - new SegmentFileStore(table.getTablePath(), segment.getSegmentFileName()) - .dropPartitions(segment, partitionSpecs, uniqueId, tobeDeletedSegs, tobeUpdatedSegs); - + new SegmentFileStore(table.getTablePath(), segment.getSegmentFileName()).dropPartitions( + segment, partitionSpecs, uniqueId, toBeDeletedSegments, toBeUpdatedSegments); } newMetaEntry.setUpdateStatusFileName(uniqueId); // Commit the removed partitions in carbon store. CarbonLoaderUtil.recordNewLoadMetadata(newMetaEntry, loadModel, false, false, uuid, - Segment.toSegmentList(tobeDeletedSegs, null), - Segment.toSegmentList(tobeUpdatedSegs, null)); + Segment.toSegmentList(toBeDeletedSegments, null), + Segment.toSegmentList(toBeUpdatedSegments, null)); return uniqueId; } return null; @@ -375,10 +364,6 @@ public class CarbonOutputCommitter extends FileOutputCommitter { /** * Update the tablestatus as fail if any fail happens.And also clean up the temp folders if any * are existed. - * - * @param context - * @param state - * @throws IOException */ @Override public void abortJob(JobContext context, JobStatus.State state) throws IOException { diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java index 2d06222..bd0f5d1 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java @@ -101,12 +101,9 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> { private ReadCommittedScope readCommittedScope; /** - * {@inheritDoc} - * Configurations FileInputFormat.INPUT_DIR - * are used to get table path to read. - * - * @param job - * @return List<InputSplit> list of CarbonInputSplit + * get list of block/blocklet and make them to CarbonInputSplit + * @param job JobContext with Configuration + * @return list of CarbonInputSplit * @throws IOException */ @Override @@ -218,16 +215,6 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> { } /** - * Method to check and refresh segment cache - * - * @param job - * @param carbonTable - * @param updateStatusManager - * @param filteredSegmentToAccess - * @throws IOException - */ - - /** * Return segment list after filtering out valid segments and segments set by user by * `INPUT_SEGMENT_NUMBERS` in job configuration */ @@ -276,7 +263,7 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> { */ public List<InputSplit> getSplitsOfStreaming(JobContext job, List<Segment> streamSegments, CarbonTable carbonTable, FilterResolverIntf filterResolverIntf) throws IOException { - List<InputSplit> splits = new ArrayList<InputSplit>(); + List<InputSplit> splits = new ArrayList<>(); if (streamSegments != null && !streamSegments.isEmpty()) { numStreamSegments = streamSegments.size(); long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); @@ -294,7 +281,7 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> { streamPruner.init(filterResolverIntf); List<StreamFile> streamFiles = streamPruner.prune(streamSegments); // record the hit information of the streaming files - this.hitedStreamFiles = streamFiles.size(); + this.hitStreamFiles = streamFiles.size(); this.numStreamFiles = streamPruner.getTotalFileNums(); for (StreamFile streamFile : streamFiles) { Path path = new Path(streamFile.getFilePath()); @@ -358,7 +345,7 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> { } numSegments = validSegments.size(); - List<InputSplit> result = new LinkedList<InputSplit>(); + List<InputSplit> result = new LinkedList<>(); UpdateVO invalidBlockVOForSegmentId = null; boolean isIUDTable; diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java index ebac3d4..f52030c 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java @@ -65,7 +65,7 @@ import org.apache.log4j.Logger; * creates new segment folder and manages the folder through tablestatus file. * It also generate and writes dictionary data during load only if dictionary server is configured. */ -// TODO Move dictionary generater which is coded in spark to MR framework. +// TODO Move dictionary generator which is coded in spark to MR framework. public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, ObjectArrayWritable> { protected static final String LOAD_MODEL = "mapreduce.carbontable.load.model"; @@ -98,7 +98,7 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, Obje * Set the update timestamp if user sets in case of update query. It needs to be updated * in load status update time */ - public static final String UPADTE_TIMESTAMP = "mapreduce.carbontable.update.timestamp"; + public static final String UPDATE_TIMESTAMP = "mapreduce.carbontable.update.timestamp"; /** * During update query we first delete the old data and then add updated data to new segment, so @@ -108,11 +108,6 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, Obje public static final String SEGMENTS_TO_BE_DELETED = "mapreduce.carbontable.segments.to.be.removed"; - /** - * It is used only to fire events in case of any child tables to be loaded. - */ - public static final String OPERATION_CONTEXT = "mapreduce.carbontable.operation.context"; - private static final Logger LOG = LogServiceFactory.getLogService(CarbonTableOutputFormat.class.getName()); @@ -269,29 +264,25 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, Obje new CarbonThreadFactory("CarbonRecordWriter:" + loadModel.getTableName(), true)); // It should be started in new thread as the underlying iterator uses blocking queue. - Future future = executorService.submit(new Thread() { - @Override - public void run() { - ThreadLocalSessionInfo.getOrCreateCarbonSessionInfo().getNonSerializableExtraInfo() - .put("carbonConf", taskAttemptContext.getConfiguration()); + Future future = executorService.submit(() -> { + ThreadLocalSessionInfo.getOrCreateCarbonSessionInfo().getNonSerializableExtraInfo() + .put("carbonConf", taskAttemptContext.getConfiguration()); + try { + dataLoadExecutor.execute(loadModel, tempStoreLocations, iterators); + } catch (Exception e) { + executorService.shutdownNow(); + for (CarbonOutputIteratorWrapper iterator : iterators) { + iterator.closeWriter(true); + } try { - dataLoadExecutor - .execute(loadModel, tempStoreLocations, iterators); - } catch (Exception e) { - executorService.shutdownNow(); - for (CarbonOutputIteratorWrapper iterator : iterators) { - iterator.closeWriter(true); - } - try { - dataLoadExecutor.close(); - } catch (Exception ex) { - // As already exception happened before close() send that exception. - throw new RuntimeException(e); - } + dataLoadExecutor.close(); + } catch (Exception ex) { + // As already exception happened before close() send that exception. throw new RuntimeException(e); - } finally { - ThreadLocalSessionInfo.unsetAll(); } + throw new RuntimeException(e); + } finally { + ThreadLocalSessionInfo.unsetAll(); } }); @@ -433,15 +424,15 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, Obje public static class CarbonRecordWriter extends RecordWriter<NullWritable, ObjectArrayWritable> { - private CarbonOutputIteratorWrapper iteratorWrapper; + private final CarbonOutputIteratorWrapper iteratorWrapper; - private DataLoadExecutor dataLoadExecutor; + private final DataLoadExecutor dataLoadExecutor; - private CarbonLoadModel loadModel; + private final CarbonLoadModel loadModel; - private ExecutorService executorService; + private final ExecutorService executorService; - private Future future; + private final Future future; private boolean isClosed; @@ -526,11 +517,11 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, Obje and handles the load balancing of the write rows in round robin. */ public static class CarbonMultiRecordWriter extends CarbonRecordWriter { - private CarbonOutputIteratorWrapper[] iterators; + private final CarbonOutputIteratorWrapper[] iterators; // keep counts of number of writes called // and it is used to load balance each write call to one iterator. - private AtomicLong counter; + private final AtomicLong counter; CarbonMultiRecordWriter(CarbonOutputIteratorWrapper[] iterators, DataLoadExecutor dataLoadExecutor, CarbonLoadModel loadModel, Future future, @@ -551,9 +542,9 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, Obje @Override public void close(TaskAttemptContext taskAttemptContext) throws InterruptedException { - for (int i = 0; i < iterators.length; i++) { - synchronized (iterators[i]) { - iterators[i].closeWriter(false); + for (CarbonOutputIteratorWrapper iterator : iterators) { + synchronized (iterator) { + iterator.closeWriter(false); } } super.close(taskAttemptContext); diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/CarbonStreamInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/CarbonStreamInputFormat.java index df37e09..2a6e54f 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/CarbonStreamInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/CarbonStreamInputFormat.java @@ -47,7 +47,7 @@ public class CarbonStreamInputFormat extends FileInputFormat<Void, Object> { public static final String READ_BUFFER_SIZE_DEFAULT = "65536"; public static final String STREAM_RECORD_READER_INSTANCE = "org.apache.carbondata.stream.CarbonStreamRecordReader"; - // return raw row for handoff + // return raw row for hand off private boolean useRawRow = false; public void setUseRawRow(boolean useRawRow) { @@ -114,7 +114,7 @@ public class CarbonStreamInputFormat extends FileInputFormat<Void, Object> { for (int i = 0; i < dimension.getNumberOfChild(); i++) { CarbonDimension child = dimension.getListOfChildDimensions().get(i); DataType dataType = child.getDataType(); - GenericQueryType queryType = null; + GenericQueryType queryType; if (DataTypes.isArrayType(dataType)) { queryType = new ArrayQueryType(child.getColName(), dimension.getColName(), ++parentColumnIndex); diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/CarbonStreamUtils.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/CarbonStreamUtils.java index c060535..29170fa 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/CarbonStreamUtils.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/CarbonStreamUtils.java @@ -33,9 +33,9 @@ public class CarbonStreamUtils { } - public static Object getInstanceWithReflection(Constructor cons, Object... initargs) throws + public static Object getInstanceWithReflection(Constructor cons, Object... initArgs) throws IllegalAccessException, InvocationTargetException, InstantiationException { - return cons.newInstance(initargs); + return cons.newInstance(initArgs); } } diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/StreamBlockletReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/StreamBlockletReader.java index be8bc5a..6d91e78 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/StreamBlockletReader.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/StreamBlockletReader.java @@ -41,10 +41,10 @@ public class StreamBlockletReader { private final long limitStart; private final long limitEnd; private boolean isAlreadySync = false; - private Compressor compressor; + private final Compressor compressor; private int rowNums = 0; private int rowIndex = 0; - private boolean isHeaderPresent; + private final boolean isHeaderPresent; public StreamBlockletReader(byte[] syncMarker, InputStream in, long limit, boolean isHeaderPresent, String compressorName) { @@ -175,7 +175,7 @@ public class StreamBlockletReader { int ch4 = in.read(); if ((ch1 | ch2 | ch3 | ch4) < 0) throw new EOFException(); pos += 4; - return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0)); + return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + ch4); } /** diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/StreamRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/StreamRecordReader.java index 52b10dc..14637ee 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/StreamRecordReader.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/StreamRecordReader.java @@ -113,8 +113,8 @@ public class StreamRecordReader extends RecordReader<Void, Object> { // empty project, null filter protected boolean skipScanData; - // return raw row for handoff - private boolean useRawRow = false; + // return raw row for hand off + private final boolean useRawRow; public StreamRecordReader(QueryModel mdl, boolean useRawRow) { this.model = mdl; @@ -137,7 +137,7 @@ public class StreamRecordReader extends RecordReader<Void, Object> { // metadata hadoopConf = context.getConfiguration(); if (model == null) { - CarbonTableInputFormat format = new CarbonTableInputFormat<Object>(); + CarbonTableInputFormat<Object> format = new CarbonTableInputFormat<>(); model = format.createQueryModel(split, context); } carbonTable = model.getTable(); @@ -169,19 +169,19 @@ public class StreamRecordReader extends RecordReader<Void, Object> { projection = model.getProjectionColumns(); isRequired = new boolean[storageColumns.length]; - boolean[] isFiltlerDimensions = model.getIsFilterDimensions(); - boolean[] isFiltlerMeasures = model.getIsFilterMeasures(); + boolean[] isFilterDimensions = model.getIsFilterDimensions(); + boolean[] isFilterMeasures = model.getIsFilterMeasures(); isFilterRequired = new boolean[storageColumns.length]; filterMap = new int[storageColumns.length]; for (int i = 0; i < storageColumns.length; i++) { if (storageColumns[i].isDimension()) { - if (isFiltlerDimensions[storageColumns[i].getOrdinal()]) { + if (isFilterDimensions[storageColumns[i].getOrdinal()]) { isRequired[i] = true; isFilterRequired[i] = true; filterMap[i] = storageColumns[i].getOrdinal(); } } else { - if (isFiltlerMeasures[storageColumns[i].getOrdinal()]) { + if (isFilterMeasures[storageColumns[i].getOrdinal()]) { isRequired[i] = true; isFilterRequired[i] = true; filterMap[i] = carbonTable.getDimensionOrdinalMax() + storageColumns[i].getOrdinal(); @@ -279,7 +279,7 @@ public class StreamRecordReader extends RecordReader<Void, Object> { scanMore = false; } else { if (useRawRow) { - // read raw row for streaming handoff which does not require decode raw row + // read raw row for streaming hand off which does not require decode raw row readRawRowFromStream(); } else { readRowFromStream(); @@ -346,11 +346,7 @@ public class StreamRecordReader extends RecordReader<Void, Object> { BitSet bitSet = filter .isScanRequired(minMaxIndex.getMaxValues(), minMaxIndex.getMinValues(), minMaxIndex.getIsMinMaxSet()); - if (bitSet.isEmpty()) { - return false; - } else { - return true; - } + return !bitSet.isEmpty(); } } return true; @@ -375,7 +371,7 @@ public class StreamRecordReader extends RecordReader<Void, Object> { } } else { if (isNoDictColumn[colCount]) { - int v = 0; + int v; if (dimensionsIsVarcharTypeMap[colCount]) { v = input.readInt(); } else { @@ -554,7 +550,7 @@ public class StreamRecordReader extends RecordReader<Void, Object> { outputValues[colCount] = CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY; } else { if (isNoDictColumn[colCount]) { - int v = 0; + int v; if (dimensionsIsVarcharTypeMap[colCount]) { v = input.readInt(); } else { diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java index 2abfe7c..7c1d04f 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java @@ -43,10 +43,10 @@ import org.apache.carbondata.core.metadata.CarbonMetadata; import org.apache.carbondata.core.metadata.CarbonTableIdentifier; import org.apache.carbondata.core.metadata.converter.SchemaConverter; import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl; +import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.datatype.DataTypes; import org.apache.carbondata.core.metadata.encoder.Encoding; import org.apache.carbondata.core.metadata.schema.SchemaEvolution; -import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.metadata.schema.table.TableInfo; import org.apache.carbondata.core.metadata.schema.table.TableSchema; @@ -86,9 +86,9 @@ public class StoreCreator { private static final Logger LOG = LogServiceFactory.getLogService(StoreCreator.class.getCanonicalName()); - private AbsoluteTableIdentifier absoluteTableIdentifier; - private String storePath = null; - private String csvPath; + private final AbsoluteTableIdentifier absoluteTableIdentifier; + private final String storePath; + private final String csvPath; private List<String> sortColumns = new ArrayList<>(); public StoreCreator(String storePath, String csvPath) { @@ -124,7 +124,7 @@ public class StoreCreator { loadModel.setTableName(absoluteTableIdentifier.getCarbonTableIdentifier().getTableName()); loadModel.setTableName(absoluteTableIdentifier.getCarbonTableIdentifier().getTableName()); loadModel.setFactFilePath(factFilePath); - loadModel.setLoadMetadataDetails(new ArrayList<LoadMetadataDetails>()); + loadModel.setLoadMetadataDetails(new ArrayList<>()); loadModel.setTablePath(absoluteTableIdentifier.getTablePath()); loadModel.setDateFormat(null); loadModel.setCarbonTransactionalTable(table.isTransactionalTable()); @@ -198,103 +198,22 @@ public class StoreCreator { tableInfo.setDatabaseName(identifier.getCarbonTableIdentifier().getDatabaseName()); TableSchema tableSchema = new TableSchema(); tableSchema.setTableName(identifier.getCarbonTableIdentifier().getTableName()); - List<ColumnSchema> columnSchemas = new ArrayList<ColumnSchema>(); + List<ColumnSchema> columnSchemas = new ArrayList<>(); ArrayList<Encoding> encodings = new ArrayList<>(); int schemaOrdinal = 0; - ColumnSchema id = new ColumnSchema(); - id.setColumnName("id"); - id.setDataType(DataTypes.INT); - id.setEncodingList(encodings); - id.setColumnUniqueId(UUID.randomUUID().toString()); - id.setColumnReferenceId(id.getColumnUniqueId()); - id.setDimensionColumn(true); - id.setSchemaOrdinal(schemaOrdinal++); - if (sortColumns.contains(id.getColumnName())) { - id.setSortColumn(true); - } - columnSchemas.add(id); - - ColumnSchema date = new ColumnSchema(); - date.setColumnName("date"); - date.setDataType(DataTypes.STRING); - date.setEncodingList(encodings); - date.setColumnUniqueId(UUID.randomUUID().toString()); - date.setDimensionColumn(true); - date.setColumnReferenceId(date.getColumnUniqueId()); - date.setSchemaOrdinal(schemaOrdinal++); - if (sortColumns.contains(date.getColumnName())) { - date.setSortColumn(true); - } - columnSchemas.add(date); - - ColumnSchema country = new ColumnSchema(); - country.setColumnName("country"); - country.setDataType(DataTypes.STRING); - country.setEncodingList(encodings); - country.setColumnUniqueId(UUID.randomUUID().toString()); - country.setDimensionColumn(true); - country.setSortColumn(true); - country.setSchemaOrdinal(schemaOrdinal++); - if (sortColumns.contains(country.getColumnName())) { - country.setSortColumn(true); - } - country.setColumnReferenceId(country.getColumnUniqueId()); - columnSchemas.add(country); - - ColumnSchema name = new ColumnSchema(); - name.setColumnName("name"); - name.setDataType(DataTypes.STRING); - name.setEncodingList(encodings); - name.setColumnUniqueId(UUID.randomUUID().toString()); - name.setDimensionColumn(true); - name.setSchemaOrdinal(schemaOrdinal++); - if (sortColumns.contains(name.getColumnName())) { - name.setSortColumn(true); - } - name.setColumnReferenceId(name.getColumnUniqueId()); - columnSchemas.add(name); - - ColumnSchema phonetype = new ColumnSchema(); - phonetype.setColumnName("phonetype"); - phonetype.setDataType(DataTypes.STRING); - phonetype.setEncodingList(encodings); - phonetype.setColumnUniqueId(UUID.randomUUID().toString()); - phonetype.setDimensionColumn(true); - phonetype.setSchemaOrdinal(schemaOrdinal++); - if (sortColumns.contains(phonetype.getColumnName())) { - phonetype.setSortColumn(true); - } - phonetype.setColumnReferenceId(phonetype.getColumnUniqueId()); - columnSchemas.add(phonetype); - - ColumnSchema serialname = new ColumnSchema(); - serialname.setColumnName("serialname"); - serialname.setDataType(DataTypes.STRING); - serialname.setEncodingList(encodings); - serialname.setColumnUniqueId(UUID.randomUUID().toString()); - serialname.setDimensionColumn(true); - serialname.setSchemaOrdinal(schemaOrdinal++); - if (sortColumns.contains(serialname.getColumnName())) { - serialname.setSortColumn(true); - } - serialname.setColumnReferenceId(serialname.getColumnUniqueId()); - columnSchemas.add(serialname); - ColumnSchema salary = new ColumnSchema(); - salary.setColumnName("salary"); - salary.setDataType(DataTypes.INT); - salary.setEncodingList(new ArrayList<Encoding>()); - salary.setColumnUniqueId(UUID.randomUUID().toString()); - salary.setDimensionColumn(false); - salary.setColumnReferenceId(salary.getColumnUniqueId()); - salary.setSchemaOrdinal(schemaOrdinal++); - columnSchemas.add(salary); - + addColumn(columnSchemas, encodings, schemaOrdinal++, "id", DataTypes.INT, true); + addColumn(columnSchemas, encodings, schemaOrdinal++, "date", DataTypes.STRING, true); + addColumn(columnSchemas, encodings, schemaOrdinal++, "country", DataTypes.STRING, true); + addColumn(columnSchemas, encodings, schemaOrdinal++, "name", DataTypes.STRING, true); + addColumn(columnSchemas, encodings, schemaOrdinal++, "phonetype", DataTypes.STRING, true); + addColumn(columnSchemas, encodings, schemaOrdinal++, "serialname", DataTypes.STRING, true); + addColumn(columnSchemas, encodings, schemaOrdinal, "salary", DataTypes.INT, false); // rearrange the column schema based on the sort order, if sort columns exists List<ColumnSchema> columnSchemas1 = reArrangeColumnSchema(columnSchemas); tableSchema.setListOfColumns(columnSchemas1); - SchemaEvolution schemaEvol = new SchemaEvolution(); - schemaEvol.setSchemaEvolutionEntryList(new ArrayList<SchemaEvolutionEntry>()); - tableSchema.setSchemaEvolution(schemaEvol); + SchemaEvolution schemaEvolution = new SchemaEvolution(); + schemaEvolution.setSchemaEvolutionEntryList(new ArrayList<>()); + tableSchema.setSchemaEvolution(schemaEvolution); tableSchema.setTableId(UUID.randomUUID().toString()); tableInfo.setTableUniqueName( identifier.getCarbonTableIdentifier().getTableUniqueName() @@ -328,6 +247,22 @@ public class StoreCreator { return CarbonMetadata.getInstance().getCarbonTable(tableInfo.getTableUniqueName()); } + private void addColumn(List<ColumnSchema> columnSchemas, ArrayList<Encoding> encodings, + int schemaOrdinal, String name2, DataType dataType, boolean isDimensionColumn) { + ColumnSchema name = new ColumnSchema(); + name.setColumnName(name2); + name.setDataType(dataType); + name.setEncodingList(encodings); + name.setColumnUniqueId(UUID.randomUUID().toString()); + name.setColumnReferenceId(name.getColumnUniqueId()); + name.setDimensionColumn(isDimensionColumn); + name.setSchemaOrdinal(schemaOrdinal); + if (sortColumns.contains(name.getColumnName())) { + name.setSortColumn(true); + } + columnSchemas.add(name); + } + private List<ColumnSchema> reArrangeColumnSchema(List<ColumnSchema> columnSchemas) { List<ColumnSchema> newColumnSchema = new ArrayList<>(columnSchemas.size()); // add sort columns first @@ -357,10 +292,6 @@ public class StoreCreator { /** * Execute graph which will further load data - * - * @param loadModel - * @param storeLocation - * @throws Exception */ public static void loadData(CarbonLoadModel loadModel, String storeLocation) throws Exception { @@ -425,7 +356,7 @@ public class StoreCreator { writeLoadMetadata( loadModel.getCarbonDataLoadSchema(), loadModel.getTableName(), loadModel.getTableName(), - new ArrayList<LoadMetadataDetails>()); + new ArrayList<>()); } public static void writeLoadMetadata(CarbonDataLoadSchema schema, String databaseName, @@ -460,13 +391,8 @@ public class StoreCreator { writeOperation.setFailed(); throw ioe; } finally { - try { - if (null != brWriter) { - brWriter.flush(); - } - } catch (Exception e) { - throw e; - + if (null != brWriter) { + brWriter.flush(); } CarbonUtil.closeStreams(brWriter); @@ -477,11 +403,7 @@ public class StoreCreator { public static String readCurrentTime() { SimpleDateFormat sdf = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP_MILLIS); - String date = null; - - date = sdf.format(new Date()); - - return date; + return sdf.format(new Date()); } public static void main(String[] args) throws Exception { diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java index 9916a18..da47981 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java @@ -21,11 +21,8 @@ import java.io.IOException; import java.text.SimpleDateFormat; import java.util.Locale; -import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.index.IndexUtil; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; -import org.apache.carbondata.core.util.CarbonProperties; -import org.apache.carbondata.hadoop.api.CarbonFileInputFormat; import org.apache.carbondata.hadoop.api.CarbonTableInputFormat; import org.apache.hadoop.conf.Configuration; @@ -33,31 +30,11 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.log4j.Logger; /** * Utility class */ public class CarbonInputFormatUtil { - - /** - * Attribute for Carbon LOGGER. - */ - private static final Logger LOGGER = - LogServiceFactory.getLogService(CarbonProperties.class.getName()); - - public static <V> CarbonFileInputFormat<V> createCarbonFileInputFormat( - AbsoluteTableIdentifier identifier, Job job) throws IOException { - CarbonFileInputFormat<V> carbonInputFormat = new CarbonFileInputFormat<V>(); - CarbonTableInputFormat.setDatabaseName(job.getConfiguration(), - identifier.getCarbonTableIdentifier().getDatabaseName()); - CarbonTableInputFormat - .setTableName(job.getConfiguration(), identifier.getCarbonTableIdentifier().getTableName()); - FileInputFormat.addInputPath(job, new Path(identifier.getTablePath())); - setIndexJobIfConfigured(job.getConfiguration()); - return carbonInputFormat; - } - public static <V> CarbonTableInputFormat<V> createCarbonInputFormat( AbsoluteTableIdentifier identifier, Job job) throws IOException { @@ -73,9 +50,6 @@ public class CarbonInputFormatUtil { /** * This method set IndexJob if configured - * - * @param conf - * @throws IOException */ public static void setIndexJobIfConfigured(Configuration conf) throws IOException { String className = "org.apache.carbondata.indexserver.EmbeddedIndexJob"; @@ -87,8 +61,7 @@ public class CarbonInputFormatUtil { } public static JobID getJobId(java.util.Date date, int batch) { - String jobtrackerID = createJobTrackerID(date); - return new JobID(jobtrackerID, batch); + String jobTrackerID = createJobTrackerID(date); + return new JobID(jobTrackerID, batch); } - } diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputSplitTaskInfo.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputSplitTaskInfo.java index 72a8d09..d21a25a 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputSplitTaskInfo.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputSplitTaskInfo.java @@ -51,7 +51,7 @@ public class CarbonInputSplitTaskInfo implements Distributable { @Override public String[] getLocations() { - Set<String> locations = new HashSet<String>(); + Set<String> locations = new HashSet<>(); for (CarbonInputSplit splitInfo : carbonBlockInfoList) { try { locations.addAll(Arrays.asList(splitInfo.getLocations())); @@ -90,55 +90,47 @@ public class CarbonInputSplitTaskInfo implements Distributable { /** * Finding which node has the maximum number of blocks for it. - * - * @param splitList - * @return */ public static List<String> maxNoNodes(List<CarbonInputSplit> splitList) { boolean useIndex = true; - Integer maxOccurence = 0; + Integer maxOccurrence = 0; String maxNode = null; - Map<String, Integer> nodeAndOccurenceMapping = new TreeMap<>(); + Map<String, Integer> nodeAndOccurrenceMapping = new TreeMap<>(); - // populate the map of node and number of occurences of that node. + // populate the map of node and number of occurrences of that node. for (CarbonInputSplit split : splitList) { try { for (String node : split.getLocations()) { - Integer nodeOccurence = nodeAndOccurenceMapping.get(node); - if (null == nodeOccurence) { - nodeAndOccurenceMapping.put(node, 1); - } else { - nodeOccurence++; - } + nodeAndOccurrenceMapping.putIfAbsent(node, 1); } } catch (IOException e) { throw new RuntimeException("Fail to get location of split: " + split, e); } } - Integer previousValueOccurence = null; + Integer previousValueOccurrence = null; - // check which node is occured maximum times. - for (Map.Entry<String, Integer> entry : nodeAndOccurenceMapping.entrySet()) { + // check which node is occurred maximum times. + for (Map.Entry<String, Integer> entry : nodeAndOccurrenceMapping.entrySet()) { // finding the maximum node. - if (entry.getValue() > maxOccurence) { - maxOccurence = entry.getValue(); + if (entry.getValue() > maxOccurrence) { + maxOccurrence = entry.getValue(); maxNode = entry.getKey(); } - // first time scenario. initialzing the previous value. - if (null == previousValueOccurence) { - previousValueOccurence = entry.getValue(); + // first time scenario. initializing the previous value. + if (null == previousValueOccurrence) { + previousValueOccurrence = entry.getValue(); } else { // for the case where all the nodes have same number of blocks then // we need to return complete list instead of max node. - if (!Objects.equals(previousValueOccurence, entry.getValue())) { + if (!Objects.equals(previousValueOccurrence, entry.getValue())) { useIndex = false; } } } - // if all the nodes have equal occurence then returning the complete key set. + // if all the nodes have equal occurrence then returning the complete key set. if (useIndex) { - return new ArrayList<>(nodeAndOccurenceMapping.keySet()); + return new ArrayList<>(nodeAndOccurrenceMapping.keySet()); } // if any max node is found then returning the max node. diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonVectorizedRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonVectorizedRecordReader.java index 54f4bf7..491094d 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonVectorizedRecordReader.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonVectorizedRecordReader.java @@ -69,10 +69,10 @@ public class CarbonVectorizedRecordReader extends AbstractRecordReader<Object> { private AbstractDetailQueryResultIterator iterator; - private QueryModel queryModel; + private final QueryModel queryModel; //This holds mapping of fetch index with respect to project col index. // it is used when same col is used in projection many times.So need to fetch only that col. - private List<Integer> projectionMapping = new ArrayList<>(); + private final List<Integer> projectionMapping = new ArrayList<>(); public CarbonVectorizedRecordReader(QueryModel queryModel) { this.queryModel = queryModel; @@ -171,16 +171,16 @@ public class CarbonVectorizedRecordReader extends AbstractRecordReader<Object> { } CarbonColumnVector[] vectors = new CarbonColumnVector[fields.length]; - Map<String, Integer> colmap = new HashMap<>(); + Map<String, Integer> columnMap = new HashMap<>(); for (int i = 0; i < fields.length; i++) { vectors[i] = new CarbonColumnVectorImpl( CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT, fields[i].getDataType()); - if (colmap.containsKey(fields[i].getFieldName())) { - int reusedIndex = colmap.get(fields[i].getFieldName()); + if (columnMap.containsKey(fields[i].getFieldName())) { + int reusedIndex = columnMap.get(fields[i].getFieldName()); projectionMapping.add(reusedIndex); } else { - colmap.put(fields[i].getFieldName(), i); + columnMap.put(fields[i].getFieldName(), i); projectionMapping.add(i); } } diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala index 5424b66..4d1f3a3 100644 --- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala +++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala @@ -193,7 +193,7 @@ class CarbonScanRDD[T: ClassTag]( logInfo( s""" | Identified no.of.streaming splits/tasks: ${ streamPartitions.size }, - | no.of.streaming files: ${format.getHitedStreamFiles}, + | no.of.streaming files: ${format.getHitStreamFiles}, | no.of.total streaming files: ${format.getNumStreamFiles}, | no.of.total streaming segement: ${format.getNumStreamSegments} """.stripMargin) diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala index d9c2633..e25c6e1 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala @@ -174,7 +174,7 @@ with Serializable { // in load status update time val updateTimeStamp = options.get("updatetimestamp") if (updateTimeStamp.isDefined) { - conf.set(CarbonTableOutputFormat.UPADTE_TIMESTAMP, updateTimeStamp.get) + conf.set(CarbonTableOutputFormat.UPDATE_TIMESTAMP, updateTimeStamp.get) } conf.set(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, sparkSession.sparkContext.appName) CarbonTableOutputFormat.setLoadModel(conf, model)