[CARBONDATA-1903] Remove unused FileUtil and Optimize code This closes #1678
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/910d496c Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/910d496c Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/910d496c Branch: refs/heads/branch-1.3 Commit: 910d496cf2a668dc10b40c202889003fe5cfb556 Parents: 3b6f26c Author: xuchuanyin <[email protected]> Authored: Wed Jan 3 16:14:54 2018 +0800 Committer: QiangCai <[email protected]> Committed: Thu Jan 4 09:36:16 2018 +0800 ---------------------------------------------------------------------- .../common/logging/impl/FileUtil.java | 94 ------------ .../common/logging/impl/FileUtilTest_UT.java | 60 -------- .../page/encoding/DefaultEncodingFactory.java | 3 +- .../indexstore/BlockletDataMapIndexStore.java | 5 +- .../BlockletDataRefNodeWrapper.java | 10 +- .../blockletindex/SegmentIndexFileStore.java | 6 +- .../MultiDimKeyVarLengthEquiSplitGenerator.java | 5 +- ...tiDimKeyVarLengthVariableSplitGenerator.java | 5 +- .../carbondata/core/locks/ZooKeeperLocking.java | 17 +-- .../core/metadata/schema/table/CarbonTable.java | 11 +- .../core/scan/scanner/impl/FilterScanner.java | 3 +- .../statusmanager/SegmentStatusManager.java | 9 +- .../apache/carbondata/core/util/CarbonUtil.java | 3 +- .../core/util/DataTypeConverterImpl.java | 3 +- .../core/util/path/HDFSLeaseUtils.java | 3 +- .../carbondata/examples/AllDictionaryUtil.scala | 2 +- .../examples/CarbonSortColumnsExample.scala | 1 - .../examples/DataUpdateDeleteExample.scala | 9 +- .../carbondata/examples/StreamExample.scala | 2 +- .../hadoop/api/CarbonTableInputFormat.java | 9 +- .../carbondata/hadoop/util/SchemaReader.java | 9 +- .../carbondata/hive/CarbonArrayInspector.java | 3 +- .../carbondata/presto/CarbondataMetadata.java | 3 +- .../carbondata/presto/CarbondataRecordSet.java | 4 +- .../presto/impl/CarbonTableReader.java | 4 +- .../spark/rdd/CarbonMergeFilesRDD.scala | 2 +- .../spark/rdd/NewCarbonDataLoadRDD.scala | 2 - .../carbondata/spark/util/CommonUtil.scala | 1 - .../spark/util/CarbonReflectionUtils.scala | 3 - .../spark/rdd/CarbonDataRDDFactory.scala | 4 +- .../sql/CarbonDatasourceHadoopRelation.scala | 2 +- .../spark/sql/CarbonDictionaryDecoder.scala | 2 +- .../scala/org/apache/spark/sql/CarbonEnv.scala | 2 +- .../command/mutation/DeleteExecution.scala | 8 -- .../table/CarbonDescribeFormattedCommand.scala | 2 - .../command/timeseries/TimeSeriesUtil.scala | 143 ++++++++++++++++++ .../command/timeseries/TimeseriesUtil.scala | 144 ------------------- .../datasources/CarbonFileFormat.scala | 2 +- .../sql/execution/strategy/DDLStrategy.scala | 2 +- .../spark/sql/hive/CarbonAnalysisRules.scala | 8 +- .../spark/sql/hive/CarbonFileMetastore.scala | 6 +- .../sql/hive/CarbonPreAggregateRules.scala | 2 +- .../sql/parser/CarbonSpark2SqlParser.scala | 4 +- .../org/apache/spark/util/AlterTableUtil.scala | 2 +- .../loading/DataLoadProcessBuilder.java | 4 +- .../steps/DataWriterProcessorStepImpl.java | 5 +- .../processing/merger/CarbonDataMergerUtil.java | 4 +- .../merger/CompactionResultSortProcessor.java | 8 +- .../carbondata/streaming/StreamHandoffRDD.scala | 3 +- .../streaming/StreamSinkFactory.scala | 2 +- 50 files changed, 212 insertions(+), 438 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/910d496c/common/src/main/java/org/apache/carbondata/common/logging/impl/FileUtil.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/carbondata/common/logging/impl/FileUtil.java b/common/src/main/java/org/apache/carbondata/common/logging/impl/FileUtil.java deleted file mode 100644 index 1c419e1..0000000 --- a/common/src/main/java/org/apache/carbondata/common/logging/impl/FileUtil.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.common.logging.impl; - -import java.io.Closeable; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.util.Properties; - -import org.apache.log4j.Logger; - -/** - * Provides file Utility - */ -public final class FileUtil { - - public static final String CARBON_PROPERTIES_FILE_PATH = "../../../conf/carbon.properties"; - private static final Logger LOG = Logger.getLogger(FileUtil.class.getName()); - private static Properties carbonProperties; - - private FileUtil() { - - } - - public static Properties getCarbonProperties() { - if (null == carbonProperties) { - loadProperties(); - } - - return carbonProperties; - } - - /** - * closes the stream - * - * @param stream stream to be closed. - */ - public static void close(Closeable stream) { - if (null != stream) { - try { - stream.close(); - } catch (IOException e) { - LOG.error("Exception while closing the Log stream"); - } - } - } - - private static void loadProperties() { - String property = System.getProperty("carbon.properties.filepath"); - if (null == property) { - property = CARBON_PROPERTIES_FILE_PATH; - } - File file = new File(property); - - FileInputStream fis = null; - try { - if (file.exists()) { - fis = new FileInputStream(file); - - carbonProperties = new Properties(); - carbonProperties.load(fis); - } - } catch (FileNotFoundException e) { - LOG.error("Could not find carbon properties file in the path " + property); - } catch (IOException e) { - LOG.error("Error while reading carbon properties file in the path " + property); - } finally { - if (null != fis) { - try { - fis.close(); - } catch (IOException e) { - LOG.error("Error while closing the file stream for carbon.properties"); - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/910d496c/common/src/test/java/org/apache/carbondata/common/logging/impl/FileUtilTest_UT.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/carbondata/common/logging/impl/FileUtilTest_UT.java b/common/src/test/java/org/apache/carbondata/common/logging/impl/FileUtilTest_UT.java deleted file mode 100644 index 57de080..0000000 --- a/common/src/test/java/org/apache/carbondata/common/logging/impl/FileUtilTest_UT.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.common.logging.impl; - -import java.io.File; -import java.io.FileInputStream; -import java.io.FileNotFoundException; - -import junit.framework.TestCase; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -public class FileUtilTest_UT extends TestCase { - - /** - * @throws Exception - */ - @Before public void setUp() throws Exception { - File f = new File("myfile.txt"); - if (!f.exists()) { - f.createNewFile(); - } - } - - /** - * @throws Exception - */ - @After public void tearDown() throws Exception { - File f = new File("myfile.txt"); - if (f.exists()) { - f.delete(); - } - } - - @Test public void testClose() { - try { - FileInputStream in = new FileInputStream(new File("myfile.txt")); - FileUtil.close(in); - assertTrue(true); - } catch (FileNotFoundException e) { - assertTrue(false); - } - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/910d496c/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java index 0e32115..5c668be 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java @@ -86,8 +86,7 @@ public class DefaultEncodingFactory extends EncodingFactory { } } - private ColumnPageEncoder createEncoderForDimensionLegacy(TableSpec.DimensionSpec columnSpec) { - TableSpec.DimensionSpec dimensionSpec = columnSpec; + private ColumnPageEncoder createEncoderForDimensionLegacy(TableSpec.DimensionSpec dimensionSpec) { Compressor compressor = CompressorFactory.getInstance().getCompressor(); switch (dimensionSpec.getColumnType()) { case GLOBAL_DICTIONARY: http://git-wip-us.apache.org/repos/asf/carbondata/blob/910d496c/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java index f3e55e6..5b13ac8 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java @@ -145,9 +145,8 @@ public class BlockletDataMapIndexStore @Override public BlockletDataMap getIfPresent( TableBlockIndexUniqueIdentifier tableSegmentUniqueIdentifier) { - BlockletDataMap dataMap = (BlockletDataMap) lruCache - .get(tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier()); - return dataMap; + return (BlockletDataMap) lruCache.get( + tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier()); } /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/910d496c/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataRefNodeWrapper.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataRefNodeWrapper.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataRefNodeWrapper.java index 7868308..dfc8a38 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataRefNodeWrapper.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataRefNodeWrapper.java @@ -129,11 +129,11 @@ public class BlockletDataRefNodeWrapper implements DataRefNode { private DimensionColumnChunkReader getDimensionColumnChunkReader() throws IOException { ColumnarFormatVersion version = ColumnarFormatVersion.valueOf(blockInfos.get(index).getDetailInfo().getVersionNumber()); - DimensionColumnChunkReader dimensionColumnChunkReader = CarbonDataReaderFactory.getInstance() - .getDimensionColumnChunkReader(version, - blockInfos.get(index).getDetailInfo().getBlockletInfo(), dimensionLens, - blockInfos.get(index).getFilePath()); - return dimensionColumnChunkReader; + return CarbonDataReaderFactory.getInstance().getDimensionColumnChunkReader( + version, + blockInfos.get(index).getDetailInfo().getBlockletInfo(), + dimensionLens, + blockInfos.get(index).getFilePath()); } private MeasureColumnChunkReader getMeasureColumnChunkReader() throws IOException { http://git-wip-us.apache.org/repos/asf/carbondata/blob/910d496c/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java index 01cb1d7..444a67b 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java @@ -90,14 +90,12 @@ public class SegmentIndexFileStore { * @throws IOException */ public List<String> getIndexFilesFromMergeFile(String mergeFile) throws IOException { - List<String> indexFiles = new ArrayList<>(); ThriftReader thriftReader = new ThriftReader(mergeFile); thriftReader.open(); MergedBlockIndexHeader indexHeader = readMergeBlockIndexHeader(thriftReader); - List<String> file_names = indexHeader.getFile_names(); - indexFiles.addAll(file_names); + List<String> fileNames = indexHeader.getFile_names(); thriftReader.close(); - return indexFiles; + return fileNames; } /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/910d496c/core/src/main/java/org/apache/carbondata/core/keygenerator/columnar/impl/MultiDimKeyVarLengthEquiSplitGenerator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/keygenerator/columnar/impl/MultiDimKeyVarLengthEquiSplitGenerator.java b/core/src/main/java/org/apache/carbondata/core/keygenerator/columnar/impl/MultiDimKeyVarLengthEquiSplitGenerator.java index 72b250b..586f881 100644 --- a/core/src/main/java/org/apache/carbondata/core/keygenerator/columnar/impl/MultiDimKeyVarLengthEquiSplitGenerator.java +++ b/core/src/main/java/org/apache/carbondata/core/keygenerator/columnar/impl/MultiDimKeyVarLengthEquiSplitGenerator.java @@ -71,10 +71,7 @@ public class MultiDimKeyVarLengthEquiSplitGenerator extends MultiDimKeyVarLength List<Integer>[] splits = new List[splitList.size()]; int i = 0; for (Set<Integer> splitLocal : splitList) { - List<Integer> range = new ArrayList<Integer>(CarbonCommonConstants.CONSTANT_SIZE_TEN); - for (Integer index : splitLocal) { - range.add(index); - } + List<Integer> range = new ArrayList<Integer>(splitLocal); splits[i++] = range; } for (int j = 1; j < splits.length; j++) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/910d496c/core/src/main/java/org/apache/carbondata/core/keygenerator/columnar/impl/MultiDimKeyVarLengthVariableSplitGenerator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/keygenerator/columnar/impl/MultiDimKeyVarLengthVariableSplitGenerator.java b/core/src/main/java/org/apache/carbondata/core/keygenerator/columnar/impl/MultiDimKeyVarLengthVariableSplitGenerator.java index 1ed4577..3f53994 100644 --- a/core/src/main/java/org/apache/carbondata/core/keygenerator/columnar/impl/MultiDimKeyVarLengthVariableSplitGenerator.java +++ b/core/src/main/java/org/apache/carbondata/core/keygenerator/columnar/impl/MultiDimKeyVarLengthVariableSplitGenerator.java @@ -72,10 +72,7 @@ public class MultiDimKeyVarLengthVariableSplitGenerator extends MultiDimKeyVarLe List<Integer>[] splits = new List[splitList.size()]; int i = 0; for (Set<Integer> splitLocal : splitList) { - List<Integer> range = new ArrayList<Integer>(CarbonCommonConstants.CONSTANT_SIZE_TEN); - for (Integer index : splitLocal) { - range.add(index); - } + List<Integer> range = new ArrayList<Integer>(splitLocal); splits[i++] = range; } for (int j = 1; j < splits.length; j++) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/910d496c/core/src/main/java/org/apache/carbondata/core/locks/ZooKeeperLocking.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/locks/ZooKeeperLocking.java b/core/src/main/java/org/apache/carbondata/core/locks/ZooKeeperLocking.java index 6fc2486..1de5004 100644 --- a/core/src/main/java/org/apache/carbondata/core/locks/ZooKeeperLocking.java +++ b/core/src/main/java/org/apache/carbondata/core/locks/ZooKeeperLocking.java @@ -127,20 +127,11 @@ public class ZooKeeperLocking extends AbstractCarbonLock { * @throws InterruptedException */ private void createRecursivly(String path) throws KeeperException, InterruptedException { - try { - if (zk.exists(path, true) == null && path.length() > 0) { - String temp = path.substring(0, path.lastIndexOf(CarbonCommonConstants.FILE_SEPARATOR)); - createRecursivly(temp); - zk.create(path, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - } else { - return; - } - } catch (KeeperException e) { - throw e; - } catch (InterruptedException e) { - throw e; + if (zk.exists(path, true) == null && path.length() > 0) { + String temp = path.substring(0, path.lastIndexOf(CarbonCommonConstants.FILE_SEPARATOR)); + createRecursivly(temp); + zk.create(path, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } - } /** * Handling of the locking mechanism using zoo keeper. http://git-wip-us.apache.org/repos/asf/carbondata/blob/910d496c/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java index 8c4b08b..5e07fc0 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java @@ -164,18 +164,11 @@ public class CarbonTable implements Serializable { List<CarbonColumn> columns = new ArrayList<CarbonColumn>(); List<CarbonDimension> dimensions = this.tableDimensionsMap.get(tableName); List<CarbonMeasure> measures = this.tableMeasuresMap.get(tableName); - Iterator<CarbonDimension> dimItr = dimensions.iterator(); - while (dimItr.hasNext()) { - columns.add(dimItr.next()); - } - Iterator<CarbonMeasure> msrItr = measures.iterator(); - while (msrItr.hasNext()) { - columns.add(msrItr.next()); - } + columns.addAll(dimensions); + columns.addAll(measures); Collections.sort(columns, new Comparator<CarbonColumn>() { @Override public int compare(CarbonColumn o1, CarbonColumn o2) { - return Integer.compare(o1.getSchemaOrdinal(), o2.getSchemaOrdinal()); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/910d496c/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/FilterScanner.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/FilterScanner.java b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/FilterScanner.java index 53c647c..efc0e20 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/FilterScanner.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/FilterScanner.java @@ -96,8 +96,7 @@ public class FilterScanner extends AbstractBlockletScanner { */ @Override public AbstractScannedResult scanBlocklet(BlocksChunkHolder blocksChunkHolder) throws IOException, FilterUnsupportedException { - AbstractScannedResult result = fillScannedResult(blocksChunkHolder); - return result; + return fillScannedResult(blocksChunkHolder); } @Override public boolean isScanRequired(BlocksChunkHolder blocksChunkHolder) throws IOException { http://git-wip-us.apache.org/repos/asf/carbondata/blob/910d496c/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java index 71d5a6a..7804ea8 100755 --- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java +++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java @@ -165,14 +165,7 @@ public class SegmentStatusManager { LOG.error(e); throw e; } finally { - try { - if (null != dataInputStream) { - dataInputStream.close(); - } - } catch (Exception e) { - LOG.error(e); - throw e; - } + CarbonUtil.closeStreams(dataInputStream); } return new ValidAndInvalidSegmentsInfo(listOfValidSegments, listOfValidUpdatedSegments, listOfInvalidSegments, listOfStreamSegments); http://git-wip-us.apache.org/repos/asf/carbondata/blob/910d496c/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java index a9d2cad..0839ae9 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java @@ -1998,8 +1998,7 @@ public final class CarbonUtil { * @return */ public static Map<String, String> removeSchemaFromMap(Map<String, String> properties) { - Map<String, String> newMap = new HashMap<>(); - newMap.putAll(properties); + Map<String, String> newMap = new HashMap<>(properties); String partsNo = newMap.get("carbonSchemaPartsNo"); if (partsNo == null) { return newMap; http://git-wip-us.apache.org/repos/asf/carbondata/blob/910d496c/core/src/main/java/org/apache/carbondata/core/util/DataTypeConverterImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataTypeConverterImpl.java b/core/src/main/java/org/apache/carbondata/core/util/DataTypeConverterImpl.java index afc81bf..d4b328d 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/DataTypeConverterImpl.java +++ b/core/src/main/java/org/apache/carbondata/core/util/DataTypeConverterImpl.java @@ -26,8 +26,7 @@ public class DataTypeConverterImpl implements DataTypeConverter, Serializable { private static final long serialVersionUID = -1718154403432354200L; public Object convertToDecimal(Object data) { - java.math.BigDecimal javaDecVal = new java.math.BigDecimal(data.toString()); - return javaDecVal; + return new java.math.BigDecimal(data.toString()); } public Object convertFromByteToUTF8String(Object data) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/910d496c/core/src/main/java/org/apache/carbondata/core/util/path/HDFSLeaseUtils.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/path/HDFSLeaseUtils.java b/core/src/main/java/org/apache/carbondata/core/util/path/HDFSLeaseUtils.java index fcd1655..eef2507 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/path/HDFSLeaseUtils.java +++ b/core/src/main/java/org/apache/carbondata/core/util/path/HDFSLeaseUtils.java @@ -114,14 +114,13 @@ public class HDFSLeaseUtils { */ private static boolean recoverLeaseOnFile(String filePath, Path path, DistributedFileSystem fs) throws IOException { - DistributedFileSystem dfs = fs; int maxAttempts = getLeaseRecoveryRetryCount(); int retryInterval = getLeaseRecoveryRetryInterval(); boolean leaseRecovered = false; IOException ioException = null; for (int retryCount = 1; retryCount <= maxAttempts; retryCount++) { try { - leaseRecovered = dfs.recoverLease(path); + leaseRecovered = fs.recoverLease(path); if (!leaseRecovered) { try { LOGGER.info( http://git-wip-us.apache.org/repos/asf/carbondata/blob/910d496c/examples/spark2/src/main/scala/org/apache/carbondata/examples/AllDictionaryUtil.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/AllDictionaryUtil.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/AllDictionaryUtil.scala index 50d26aa..20f7bba 100644 --- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/AllDictionaryUtil.scala +++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/AllDictionaryUtil.scala @@ -50,7 +50,7 @@ object AllDictionaryUtil { try { result += ((i, tokens(i))) } catch { - case ex: ArrayIndexOutOfBoundsException => + case _: ArrayIndexOutOfBoundsException => LOGGER.error("Read a bad record: " + x) } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/910d496c/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSortColumnsExample.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSortColumnsExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSortColumnsExample.scala index cea6341..3a9f26b 100644 --- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSortColumnsExample.scala +++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSortColumnsExample.scala @@ -31,7 +31,6 @@ object CarbonSortColumnsExample { + "../../../..").getCanonicalPath val storeLocation = s"$rootPath/examples/spark2/target/store" val warehouse = s"$rootPath/examples/spark2/target/warehouse" - val metastoredb = s"$rootPath/examples/spark2/target" CarbonProperties.getInstance() .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd HH:mm:ss") http://git-wip-us.apache.org/repos/asf/carbondata/blob/910d496c/examples/spark2/src/main/scala/org/apache/carbondata/examples/DataUpdateDeleteExample.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/DataUpdateDeleteExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/DataUpdateDeleteExample.scala index 267d365..2646b8a 100644 --- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/DataUpdateDeleteExample.scala +++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/DataUpdateDeleteExample.scala @@ -36,9 +36,8 @@ object DataUpdateDeleteExample { // for hdfs files // var rootPath = "hdfs://hdfs-host/carbon" - var storeLocation = s"$rootPath/examples/spark2/target/store" - var warehouse = s"$rootPath/examples/spark2/target/warehouse" - var metastoredb = s"$rootPath/examples/spark2/target" + val storeLocation = s"$rootPath/examples/spark2/target/store" + val warehouse = s"$rootPath/examples/spark2/target/warehouse" import org.apache.spark.sql.CarbonSession._ val spark = SparkSession @@ -61,7 +60,7 @@ object DataUpdateDeleteExample { spark.sql("DROP TABLE IF EXISTS t5") // Simulate data and write to table t3 - var sdf = new SimpleDateFormat("yyyy-MM-dd") + val sdf = new SimpleDateFormat("yyyy-MM-dd") var df = spark.sparkContext.parallelize(1 to 10) .map(x => (x, new java.sql.Date(sdf.parse("2015-07-" + (x % 10 + 10)).getTime), "china", "aaa" + x, "phone" + 555 * x, "ASD" + (60000 + x), 14999 + x)) @@ -153,7 +152,7 @@ object DataUpdateDeleteExample { """).show() // 5.Delete data WHERE id in (1, 2, $key) - var key = 3 + val key = 3 spark.sql(s""" DELETE FROM t3 WHERE t3_id in (1, 2, $key) """).show() http://git-wip-us.apache.org/repos/asf/carbondata/blob/910d496c/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamExample.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamExample.scala index b59e960..5ef9d2a 100644 --- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamExample.scala +++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamExample.scala @@ -174,7 +174,7 @@ object StreamExample { qry.awaitTermination() } catch { - case ex => + case ex: Exception => ex.printStackTrace() println("Done reading and writing streaming data") } finally { http://git-wip-us.apache.org/repos/asf/carbondata/blob/910d496c/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java index 6d975a6..c02e03e 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java @@ -203,8 +203,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> { private static DataMapJob getDataMapJob(Configuration configuration) throws IOException { String jobString = configuration.get(DATA_MAP_DSTR); if (jobString != null) { - DataMapJob dataMapJob = (DataMapJob) ObjectSerializationUtil.convertStringToObject(jobString); - return dataMapJob; + return (DataMapJob) ObjectSerializationUtil.convertStringToObject(jobString); } return null; } @@ -450,8 +449,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> { */ private List<String> getFilteredSegment(JobContext job, List<String> validSegments) { String[] segmentsToAccess = getSegmentsToAccess(job); - Set<String> segmentToAccessSet = new HashSet<>(); - segmentToAccessSet.addAll(Arrays.asList(segmentsToAccess)); + Set<String> segmentToAccessSet = new HashSet<>(Arrays.asList(segmentsToAccess)); List<String> filteredSegmentToAccess = new ArrayList<>(); if (segmentsToAccess.length == 0 || segmentsToAccess[0].equalsIgnoreCase("*")) { filteredSegmentToAccess.addAll(validSegments); @@ -462,8 +460,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> { } } if (!filteredSegmentToAccess.containsAll(segmentToAccessSet)) { - List<String> filteredSegmentToAccessTemp = new ArrayList<>(); - filteredSegmentToAccessTemp.addAll(filteredSegmentToAccess); + List<String> filteredSegmentToAccessTemp = new ArrayList<>(filteredSegmentToAccess); filteredSegmentToAccessTemp.removeAll(segmentToAccessSet); LOG.info( "Segments ignored are : " + Arrays.toString(filteredSegmentToAccessTemp.toArray())); http://git-wip-us.apache.org/repos/asf/carbondata/blob/910d496c/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java index c0f8816..423bb2a 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java @@ -75,9 +75,10 @@ public class SchemaReader { new ThriftWrapperSchemaConverterImpl(); CarbonTableIdentifier carbonTableIdentifier = absoluteTableIdentifier.getCarbonTableIdentifier(); - TableInfo tableInfo = thriftWrapperSchemaConverter - .fromExternalToWrapperTableInfo(thriftTableInfo, carbonTableIdentifier.getDatabaseName(), - carbonTableIdentifier.getTableName(), absoluteTableIdentifier.getTablePath()); - return tableInfo; + return thriftWrapperSchemaConverter.fromExternalToWrapperTableInfo( + thriftTableInfo, + carbonTableIdentifier.getDatabaseName(), + carbonTableIdentifier.getTableName(), + absoluteTableIdentifier.getTablePath()); } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/910d496c/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonArrayInspector.java ---------------------------------------------------------------------- diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonArrayInspector.java b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonArrayInspector.java index b26c959..8ebf7eb 100644 --- a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonArrayInspector.java +++ b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonArrayInspector.java @@ -120,8 +120,7 @@ class CarbonArrayInspector implements SettableListObjectInspector { } final Writable[] array = ((ArrayWritable) subObj).get(); - final List<Writable> list = Arrays.asList(array); - return list; + return Arrays.asList(array); } throw new UnsupportedOperationException("Cannot inspect " + data.getClass().getCanonicalName()); http://git-wip-us.apache.org/repos/asf/carbondata/blob/910d496c/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java index cee7c35..cc8043e 100755 --- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java @@ -55,8 +55,7 @@ public class CarbondataMetadata implements ConnectorMetadata { } public List<String> listSchemaNamesInternal() { - List<String> schemaNameList = carbonTableReader.getSchemaNames();; - return schemaNameList; + return carbonTableReader.getSchemaNames(); } @Override http://git-wip-us.apache.org/repos/asf/carbondata/blob/910d496c/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java index fb2e06e..0f8fe87 100755 --- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java @@ -81,9 +81,7 @@ public class CarbondataRecordSet implements RecordSet { CarbonVectorizedRecordReader vectorReader = new CarbonVectorizedRecordReader(queryExecutor, queryModel, (AbstractDetailQueryResultIterator) iterator); - RecordCursor rc = - new CarbondataRecordCursor(readSupport, vectorReader, columns, split); - return rc; + return new CarbondataRecordCursor(readSupport, vectorReader, columns, split); } catch (QueryExecutionException e) { throw new RuntimeException(e.getMessage(), e); } catch (Exception ex) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/910d496c/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java index a79b17f..b0271ef 100755 --- a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java @@ -227,9 +227,7 @@ public class CarbonTableReader { } requireNonNull(schemaTableName, "schemaTableName is null"); - CarbonTable table = loadTableMetadata(schemaTableName); - - return table; + return loadTableMetadata(schemaTableName); } /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/910d496c/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergeFilesRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergeFilesRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergeFilesRDD.scala index 6e8b000..87153f7 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergeFilesRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergeFilesRDD.scala @@ -22,7 +22,7 @@ import org.apache.spark.{Partition, SparkContext, TaskContext} import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.core.writer.CarbonIndexFileMergeWriter -case class CarbonMergeFilePartition(rddId: Int, val idx: Int, segmentPath: String) +case class CarbonMergeFilePartition(rddId: Int, idx: Int, segmentPath: String) extends Partition { override val index: Int = idx http://git-wip-us.apache.org/repos/asf/carbondata/blob/910d496c/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala index 76e6965..290c8f8 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala @@ -22,7 +22,6 @@ import java.nio.ByteBuffer import java.text.SimpleDateFormat import java.util.{Date, UUID} -import scala.collection.JavaConverters._ import scala.collection.mutable import scala.util.Random import scala.util.control.NonFatal @@ -43,7 +42,6 @@ import org.apache.carbondata.common.logging.impl.StandardLogService import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.compression.CompressorFactory import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus} -import org.apache.carbondata.core.statusmanager.LoadMetadataDetails import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, ThreadLocalTaskInfo} import org.apache.carbondata.processing.loading.{DataLoadExecutor, FailureCauses, TableProcessingOperations} import org.apache.carbondata.processing.loading.csvinput.{BlockDetails, CSVInputFormat, CSVRecordReaderIterator} http://git-wip-us.apache.org/repos/asf/carbondata/blob/910d496c/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala index dba8f0e..e31f838 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala @@ -155,7 +155,6 @@ object CommonUtil { } def validateTblProperties(tableProperties: Map[String, String], fields: Seq[Field]): Boolean = { - val itr = tableProperties.keys var isValid: Boolean = true tableProperties.foreach { case (key, value) => http://git-wip-us.apache.org/repos/asf/carbondata/blob/910d496c/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala b/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala index 19b967a..2f15263 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala @@ -40,9 +40,6 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants */ object CarbonReflectionUtils { - - private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) - private val rm = universe.runtimeMirror(getClass.getClassLoader) /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/910d496c/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala index 32e8fb5..8a8338e 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala @@ -505,7 +505,7 @@ object CarbonDataRDDFactory { throw new Exception(status(0)._2._2.errorMsg) } // as no record loaded in new segment, new segment should be deleted - var newEntryLoadStatus = + val newEntryLoadStatus = if (!carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isChildDataMap && !CarbonLoaderUtil.isValidSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt)) { @@ -635,7 +635,7 @@ object CarbonDataRDDFactory { val rddResult = new updateResultImpl() val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) val resultIter = new Iterator[(String, (LoadMetadataDetails, ExecutionErrors))] { - var partitionID = "0" + val partitionID = "0" val loadMetadataDetails = new LoadMetadataDetails val executionErrors = ExecutionErrors(FailureCauses.NONE, "") var uniqueLoadStatusId = "" http://git-wip-us.apache.org/repos/asf/carbondata/blob/910d496c/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala index ca0c51d..39a0d1e 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala @@ -45,7 +45,7 @@ case class CarbonDatasourceHadoopRelation( isSubquery: ArrayBuffer[Boolean] = new ArrayBuffer[Boolean]()) extends BaseRelation with InsertableRelation { - var caseInsensitiveMap = parameters.map(f => (f._1.toLowerCase, f._2)) + val caseInsensitiveMap: Map[String, String] = parameters.map(f => (f._1.toLowerCase, f._2)) lazy val identifier: AbsoluteTableIdentifier = AbsoluteTableIdentifier.from( paths.head, CarbonEnv.getDatabaseName(caseInsensitiveMap.get("dbname"))(sparkSession), http://git-wip-us.apache.org/repos/asf/carbondata/blob/910d496c/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala index 21992dd..d045ab3 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala @@ -454,7 +454,7 @@ object CarbonDictionaryDecoder { def isRequiredToDecode(colIdents: Array[(String, ColumnIdentifier, CarbonDimension)]): Boolean = { colIdents.find(p => p._1 != null) match { - case Some(value) => true + case Some(_) => true case _ => false } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/910d496c/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala index d9c50d0..82fbefa 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala @@ -21,7 +21,7 @@ import java.util.concurrent.ConcurrentHashMap import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.NoSuchTableException -import org.apache.spark.sql.execution.command.timeseries.{TimeSeriesFunction} +import org.apache.spark.sql.execution.command.timeseries.TimeSeriesFunction import org.apache.spark.sql.hive._ import org.apache.carbondata.common.logging.LogServiceFactory http://git-wip-us.apache.org/repos/asf/carbondata/blob/910d496c/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala index d2e8789..00d6657 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala @@ -321,12 +321,4 @@ object DeleteExecution { FileInputFormat.addInputPath(job, new Path(absoluteTableIdentifier.getTablePath)) (carbonInputFormat, job) } - - private def createCarbonInputFormat[V: ClassTag](absoluteTableIdentifier: AbsoluteTableIdentifier, - conf: Configuration) : CarbonTableInputFormat[V] = { - val carbonInputFormat = new CarbonTableInputFormat[V]() - val job: Job = new Job(conf) - FileInputFormat.addInputPath(job, new Path(absoluteTableIdentifier.getTablePath)) - carbonInputFormat - } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/910d496c/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala index 41656e5..3573e14 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala @@ -148,8 +148,6 @@ private[sql] case class CarbonDescribeFormattedCommand( results ++= Seq(("SORT_COLUMNS", relation.metaData.carbonTable.getSortColumns( relation.carbonTable.getTableName).asScala .map(column => column).mkString(","), "")) - val dimension = carbonTable - .getDimensionByTableName(relation.carbonTable.getTableName) if (carbonTable.getPartitionInfo(carbonTable.getTableName) != null) { results ++= Seq(("Partition Columns", carbonTable.getPartitionInfo(carbonTable.getTableName) http://git-wip-us.apache.org/repos/asf/carbondata/blob/910d496c/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala new file mode 100644 index 0000000..4fe9df0 --- /dev/null +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.command.timeseries + +import org.apache.spark.sql.execution.command.{DataMapField, Field} + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.metadata.datatype.DataTypes +import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.preagg.TimeSeriesUDF +import org.apache.carbondata.spark.exception.MalformedCarbonCommandException + +/** + * Utility class for time series to keep + */ +object TimeSeriesUtil { + + /** + * Below method will be used to validate whether column mentioned in time series + * is timestamp column or not + * + * @param dmproperties + * data map properties + * @param parentTable + * parent table + * @return whether time stamp column + */ + def validateTimeSeriesEventTime(dmproperties: Map[String, String], + parentTable: CarbonTable) { + val eventTime = dmproperties.get(CarbonCommonConstants.TIMESERIES_EVENTTIME) + if (!eventTime.isDefined) { + throw new MalformedCarbonCommandException("Eventtime not defined in time series") + } else { + val carbonColumn = parentTable.getColumnByName(parentTable.getTableName, eventTime.get) + if (carbonColumn.getDataType != DataTypes.TIMESTAMP) { + throw new MalformedCarbonCommandException( + "Timeseries event time is only supported on Timestamp " + + "column") + } + } + } + + /** + * Below method will be used to validate the hierarchy of time series and its value + * validation will be done whether hierarchy order is proper or not and hierarchy level + * value + * + * @param timeSeriesHierarchyDetails + * time series hierarchy string + */ + def validateAndGetTimeSeriesHierarchyDetails(timeSeriesHierarchyDetails: String): Array[ + (String, String)] = { + val updatedtimeSeriesHierarchyDetails = timeSeriesHierarchyDetails.toLowerCase + val timeSeriesHierarchy = updatedtimeSeriesHierarchyDetails.split(",") + val hierBuffer = timeSeriesHierarchy.map { + case f => + val splits = f.split("=") + // checking hierarchy name is valid or not + if (!TimeSeriesUDF.INSTANCE.TIMESERIES_FUNCTION.contains(splits(0).toLowerCase)) { + throw new MalformedCarbonCommandException(s"Not supported heirarchy type: ${ splits(0) }") + } + // validating hierarchy level is valid or not + if (!splits(1).equals("1")) { + throw new MalformedCarbonCommandException( + s"Unsupported Value for hierarchy:" + + s"${ splits(0) }=${ splits(1) }") + } + (splits(0), splits(1)) + } + // checking whether hierarchy is in proper order or not + // get the index of first hierarchy + val indexOfFirstHierarchy = TimeSeriesUDF.INSTANCE.TIMESERIES_FUNCTION + .indexOf(hierBuffer(0)._1.toLowerCase) + val index = 0 + // now iterating through complete hierarchy to check any of the hierarchy index + // is less than first one + for (index <- 1 to hierBuffer.size - 1) { + val currentIndex = TimeSeriesUDF.INSTANCE.TIMESERIES_FUNCTION + .indexOf(hierBuffer(index)._1.toLowerCase) + if (currentIndex < indexOfFirstHierarchy) { + throw new MalformedCarbonCommandException(s"$timeSeriesHierarchyDetails is in wrong order") + } + } + hierBuffer + } + + /** + * Below method will be used to validate whether timeseries column present in + * select statement or not + * @param fieldMapping + * fields from select plan + * @param timeSeriesColumn + * timeseries column name + */ + def validateEventTimeColumnExitsInSelect(fieldMapping: scala.collection.mutable + .LinkedHashMap[Field, DataMapField], + timeSeriesColumn: String) : Any = { + val isTimeSeriesColumnExits = fieldMapping + .exists(obj => obj._2.columnTableRelationList.isDefined && + obj._2.columnTableRelationList.get(0).parentColumnName + .equalsIgnoreCase(timeSeriesColumn) && + obj._2.aggregateFunction.isEmpty) + if(!isTimeSeriesColumnExits) { + throw new MalformedCarbonCommandException(s"Time series column ${ timeSeriesColumn } does " + + s"not exists in select") + } + } + + /** + * Below method will be used to validate whether timeseries column present in + * select statement or not + * @param fieldMapping + * fields from select plan + * @param timeSeriesColumn + * timeseries column name + */ + def updateTimeColumnSelect(fieldMapping: scala.collection.mutable + .LinkedHashMap[Field, DataMapField], + timeSeriesColumn: String, + timeSeriesFunction: String) : Any = { + val isTimeSeriesColumnExits = fieldMapping + .find(obj => obj._2.columnTableRelationList.isDefined && + obj._2.columnTableRelationList.get(0).parentColumnName + .equalsIgnoreCase(timeSeriesColumn) && + obj._2.aggregateFunction.isEmpty) + isTimeSeriesColumnExits.get._2.aggregateFunction = timeSeriesFunction + } +} + http://git-wip-us.apache.org/repos/asf/carbondata/blob/910d496c/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeseriesUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeseriesUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeseriesUtil.scala deleted file mode 100644 index d4358b6..0000000 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeseriesUtil.scala +++ /dev/null @@ -1,144 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.execution.command.timeseries - -import org.apache.spark.sql.execution.command.{DataMapField, Field} - -import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.metadata.datatype.DataTypes -import org.apache.carbondata.core.metadata.schema.table.CarbonTable -import org.apache.carbondata.core.preagg.TimeSeriesUDF -import org.apache.carbondata.spark.exception.MalformedCarbonCommandException - -/** - * Utility class for time series to keep - */ -object TimeSeriesUtil { - - /** - * Below method will be used to validate whether column mentioned in time series - * is timestamp column or not - * - * @param dmproperties - * data map properties - * @param parentTable - * parent table - * @return whether time stamp column - */ - def validateTimeSeriesEventTime(dmproperties: Map[String, String], - parentTable: CarbonTable) { - val eventTime = dmproperties.get(CarbonCommonConstants.TIMESERIES_EVENTTIME) - if (!eventTime.isDefined) { - throw new MalformedCarbonCommandException("Eventtime not defined in time series") - } else { - val carbonColumn = parentTable.getColumnByName(parentTable.getTableName, eventTime.get) - if (carbonColumn.getDataType != DataTypes.TIMESTAMP) { - throw new MalformedCarbonCommandException( - "Timeseries event time is only supported on Timestamp " + - "column") - } - } - } - - /** - * Below method will be used to validate the hierarchy of time series and its value - * validation will be done whether hierarchy order is proper or not and hierarchy level - * value - * - * @param timeSeriesHierarchyDetails - * time series hierarchy string - */ - def validateAndGetTimeSeriesHierarchyDetails(timeSeriesHierarchyDetails: String): Array[ - (String, String)] = { - val updatedtimeSeriesHierarchyDetails = timeSeriesHierarchyDetails.toLowerCase - val timeSeriesHierarchy = updatedtimeSeriesHierarchyDetails.split(",") - val hierBuffer = timeSeriesHierarchy.map { - case f => - val splits = f.split("=") - // checking hierarchy name is valid or not - if (!TimeSeriesUDF.INSTANCE.TIMESERIES_FUNCTION.contains(splits(0).toLowerCase)) { - throw new MalformedCarbonCommandException(s"Not supported heirarchy type: ${ splits(0) }") - - } - // validating hierarchy level is valid or not - if (!splits(1).equals("1")) { - throw new MalformedCarbonCommandException( - s"Unsupported Value for hierarchy:" + - s"${ splits(0) }=${ splits(1) }") - } - (splits(0), splits(1)) - } - // checking whether hierarchy is in proper order or not - // get the index of first hierarchy - val indexOfFirstHierarchy = TimeSeriesUDF.INSTANCE.TIMESERIES_FUNCTION - .indexOf(hierBuffer(0)._1.toLowerCase) - val index = 0 - // now iterating through complete hierarchy to check any of the hierarchy index - // is less than first one - for (index <- 1 to hierBuffer.size - 1) { - val currentIndex = TimeSeriesUDF.INSTANCE.TIMESERIES_FUNCTION - .indexOf(hierBuffer(index)._1.toLowerCase) - if (currentIndex < indexOfFirstHierarchy) { - throw new MalformedCarbonCommandException(s"$timeSeriesHierarchyDetails is in wrong order") - } - } - hierBuffer - } - - /** - * Below method will be used to validate whether timeseries column present in - * select statement or not - * @param fieldMapping - * fields from select plan - * @param timeSeriesColumn - * timeseries column name - */ - def validateEventTimeColumnExitsInSelect(fieldMapping: scala.collection.mutable - .LinkedHashMap[Field, DataMapField], - timeSeriesColumn: String) : Any = { - val isTimeSeriesColumnExits = fieldMapping - .exists(obj => obj._2.columnTableRelationList.isDefined && - obj._2.columnTableRelationList.get(0).parentColumnName - .equalsIgnoreCase(timeSeriesColumn) && - obj._2.aggregateFunction.isEmpty) - if(!isTimeSeriesColumnExits) { - throw new MalformedCarbonCommandException(s"Time series column ${ timeSeriesColumn } does " + - s"not exists in select") - } - } - - /** - * Below method will be used to validate whether timeseries column present in - * select statement or not - * @param fieldMapping - * fields from select plan - * @param timeSeriesColumn - * timeseries column name - */ - def updateTimeColumnSelect(fieldMapping: scala.collection.mutable - .LinkedHashMap[Field, DataMapField], - timeSeriesColumn: String, - timeSeriesFunction: String) : Any = { - val isTimeSeriesColumnExits = fieldMapping - .find(obj => obj._2.columnTableRelationList.isDefined && - obj._2.columnTableRelationList.get(0).parentColumnName - .equalsIgnoreCase(timeSeriesColumn) && - obj._2.aggregateFunction.isEmpty) - isTimeSeriesColumnExits.get._2.aggregateFunction = timeSeriesFunction - } -} - http://git-wip-us.apache.org/repos/asf/carbondata/blob/910d496c/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala index 4b338a4..a8b7c74 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala @@ -73,7 +73,7 @@ with Serializable { "spark.sql.sources.commitProtocolClass", "org.apache.spark.sql.execution.datasources.CarbonSQLHadoopMapReduceCommitProtocol") job.setOutputFormatClass(classOf[CarbonTableOutputFormat]) - var table = CarbonEnv.getCarbonTable( + val table = CarbonEnv.getCarbonTable( TableIdentifier(options("tableName"), options.get("dbName")))(sparkSession) val model = new CarbonLoadModel val carbonProperty = CarbonProperties.getInstance() http://git-wip-us.apache.org/repos/asf/carbondata/blob/910d496c/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala index 71da25b..2805114 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala @@ -104,7 +104,7 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy { try { compactionType = CompactionType.valueOf(altertablemodel.compactionType.toUpperCase) } catch { - case _ => + case _: Exception => throw new MalformedCarbonCommandException( "Unsupported alter operation on carbon table") } http://git-wip-us.apache.org/repos/asf/carbondata/blob/910d496c/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala index ff7e06a..4996bec 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala @@ -60,7 +60,7 @@ case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[Logica relation } else if (SPARK_VERSION.startsWith("2.2")) { alias match { - case Some(a) => + case Some(_) => CarbonReflectionUtils.getSubqueryAlias( sparkSession, alias, @@ -92,13 +92,13 @@ case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[Logica attr match { case UnresolvedAlias(child22, _) => UnresolvedAlias(Alias(child22, col + "-updatedColumn")()) - case UnresolvedAttribute(param) => + case UnresolvedAttribute(_) => UnresolvedAlias(Alias(attr, col + "-updatedColumn")()) case _ => attr } } val tableName: Option[Seq[String]] = alias match { - case Some(a) => Some(alias.toSeq) + case Some(_) => Some(alias.toSeq) case _ => Some(Seq(child.asInstanceOf[UnresolvedRelation].tableIdentifier.table.toString)) } val list = Seq( @@ -175,7 +175,7 @@ case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[Logica Project(projList, relation) } else if (SPARK_VERSION.startsWith("2.2")) { alias match { - case Some(a) => + case Some(_) => val subqueryAlias = CarbonReflectionUtils.getSubqueryAlias( sparkSession, alias, http://git-wip-us.apache.org/repos/asf/carbondata/blob/910d496c/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala index 719f571..1c9ca5d 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala @@ -80,8 +80,6 @@ class CarbonFileMetastore extends CarbonMetaStore { tableModifiedTimeStore .put(CarbonCommonConstants.DATABASE_DEFAULT_NAME, System.currentTimeMillis()) - private val nextId = new AtomicLong(0) - def nextQueryId: String = { System.nanoTime() + "" } @@ -201,7 +199,7 @@ class CarbonFileMetastore extends CarbonMetaStore { try { lookupRelation(tableIdentifier)(sparkSession) } catch { - case e: Exception => + case _: Exception => return false } true @@ -418,7 +416,7 @@ class CarbonFileMetastore extends CarbonMetaStore { val fileType = FileFactory.getFileType(tablePath) FileFactory.isFileExist(tablePath, fileType) } catch { - case e: Exception => + case _: Exception => false } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/910d496c/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala index 79561c6..1c93617 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala @@ -369,7 +369,7 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule // map to maintain attribute reference present in the filter to timeseries function // if applied this is added to avoid duplicate column val mapOfColumnSeriesFun = scala.collection.mutable.HashMap.empty[AttributeReference, String] - var isValidPlan = true + val isValidPlan = true filterExp.transform { case attr: AttributeReference => if (!mapOfColumnSeriesFun.get(attr).isDefined) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/910d496c/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala index 5d00a0c..a25be06 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala @@ -212,8 +212,8 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser { val relation : UnresolvedRelation = tab._1 match { case r@CarbonUnresolvedRelation(tableIdentifier) => tab._3 match { - case Some(a) => (updateRelation(r, tableIdentifier, tab._4, Some(tab._3.get))) - case None => (updateRelation(r, tableIdentifier, tab._4, None)) + case Some(a) => updateRelation(r, tableIdentifier, tab._4, Some(tab._3.get)) + case None => updateRelation(r, tableIdentifier, tab._4, None) } case _ => tab._1 } http://git-wip-us.apache.org/repos/asf/carbondata/blob/910d496c/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala index e757836..8ebd5a9 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala @@ -335,7 +335,7 @@ object AlterTableUtil { LOGGER.audit(s"Alter table properties request has been received for $dbName.$tableName") val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK) var locks = List.empty[ICarbonLock] - var timeStamp = 0L + val timeStamp = 0L var carbonTable: CarbonTable = null try { locks = AlterTableUtil http://git-wip-us.apache.org/repos/asf/carbondata/blob/910d496c/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java index 87d5b97..f7eff81 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java @@ -93,9 +93,7 @@ public final class DataLoadProcessBuilder { AbstractDataLoadProcessorStep converterProcessorStep = new DataConverterProcessorStepImpl(configuration, inputProcessorStep); // 3. Writes the sorted data in carbondata format. - AbstractDataLoadProcessorStep writerProcessorStep = - new CarbonRowDataWriterProcessorStepImpl(configuration, converterProcessorStep); - return writerProcessorStep; + return new CarbonRowDataWriterProcessorStepImpl(configuration, converterProcessorStep); } private AbstractDataLoadProcessorStep buildInternalForBatchSort(CarbonIterator[] inputIterators, http://git-wip-us.apache.org/repos/asf/carbondata/blob/910d496c/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java index e8e2b0e..d321405 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java @@ -78,9 +78,8 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep { CarbonTableIdentifier tableIdentifier = configuration.getTableIdentifier().getCarbonTableIdentifier(); String[] storeLocation = getStoreLocation(tableIdentifier, String.valueOf(partitionId)); - CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel - .createCarbonFactDataHandlerModel(configuration, storeLocation, partitionId, 0); - return model; + return CarbonFactDataHandlerModel.createCarbonFactDataHandlerModel(configuration, + storeLocation, partitionId, 0); } @Override public Iterator<CarbonRowBatch>[] execute() throws CarbonDataLoadingException { http://git-wip-us.apache.org/repos/asf/carbondata/blob/910d496c/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java index 26648d0..be27866 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java @@ -978,9 +978,7 @@ public final class CarbonDataMergerUtil { for (String segName : deleteSegments) { List<String> tempSegments = getDeleteDeltaFilesInSeg(segName, segmentUpdateStatusManager, numberDeleteDeltaFilesThreshold); - for (String tempSeg : tempSegments) { - validSegments.add(tempSeg); - } + validSegments.addAll(tempSegments); } } } else if (CompactionType.IUD_UPDDEL_DELTA == compactionTypeIUD) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/910d496c/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java index de3572e..2480a39 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java @@ -302,17 +302,17 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor { dataHandler.finish(); } catch (CarbonDataWriterException e) { LOGGER.error(e); - throw new Exception("Problem loading data during compaction: " + e.getMessage()); + throw new Exception("Problem loading data during compaction.", e); } catch (Exception e) { LOGGER.error(e); - throw new Exception("Problem loading data during compaction: " + e.getMessage()); + throw new Exception("Problem loading data during compaction.", e); } finally { if (null != dataHandler) { try { dataHandler.closeHandler(); } catch (CarbonDataWriterException e) { - LOGGER.error(e); - throw new Exception("Problem loading data during compaction: " + e.getMessage()); + LOGGER.error(e, "Error in close data handler"); + throw new Exception("Error in close data handler", e); } } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/910d496c/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala index c88575e..4862604 100644 --- a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala +++ b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala @@ -287,7 +287,6 @@ object StreamHandoffRDD { sparkSession: SparkSession, handoffSegmenId: String ): Unit = { - val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable var loadStatus = SegmentStatus.SUCCESS var errorMessage: String = "Handoff failure" try { @@ -313,7 +312,7 @@ object StreamHandoffRDD { } } } catch { - case ex => + case ex: Exception => loadStatus = SegmentStatus.LOAD_FAILURE errorMessage = errorMessage + ": " + ex.getCause.getMessage LOGGER.error(errorMessage) http://git-wip-us.apache.org/repos/asf/carbondata/blob/910d496c/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala index 02a076e..b720c1a 100644 --- a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala +++ b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala @@ -91,7 +91,7 @@ object StreamSinkFactory { CarbonCommonConstants.HANDOFF_SIZE_MIN) } } catch { - case ex: NumberFormatException => + case _: NumberFormatException => new CarbonStreamException(CarbonCommonConstants.HANDOFF_SIZE + s" $segmentSize is an illegal number") }
