Hi Aaron, Can you please run compaction again with *carbon.local.dictionary.decoder.fallback=false *and share the result for the same.
-Regards Kumar Vishal On Thu, Sep 27, 2018 at 7:37 PM aaron <[email protected]> wrote: > This is the method I construct carbon instance, hope this can help you. > > def carbonSession(appName: String, masterUrl: String, parallelism: String, > logLevel: String, hdfsUrl: > String="hdfs://ec2-dca-aa-p-sdn-16.appannie.org:9000"): SparkSession = { > val storeLocation = s"${hdfsUrl}/usr/carbon/data" > > CarbonProperties.getInstance() > .addProperty(CarbonCommonConstants.STORE_LOCATION, storeLocation) > .addProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT, "true") > .addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true") > .addProperty(CarbonCommonConstants.CARBON_TASK_DISTRIBUTION, > CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_BLOCKLET) > .addProperty(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION, > "false") > .addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER, "true") > //.addProperty(CarbonCommonConstants.ENABLE_AUTO_HANDOFF, "true") > .addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "true") > > .addProperty(CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD, > "4,3") > .addProperty(CarbonCommonConstants.DAYS_ALLOWED_TO_COMPACT, "0") > .addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, > s"${hdfsUrl}/usr/carbon/badrecords") > .addProperty(CarbonCommonConstants.CARBON_QUERY_MIN_MAX_ENABLED, > "true") > .addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS, "false") > .addProperty(CarbonCommonConstants.ENABLE_DATA_LOADING_STATISTICS, > "false") > .addProperty(CarbonCommonConstants.MAX_QUERY_EXECUTION_TIME, "2") // > 2 minutes > .addProperty(CarbonCommonConstants.LOCK_TYPE, "HDFSLOCK") > .addProperty(CarbonCommonConstants.LOCK_PATH, > s"${hdfsUrl}/usr/carbon/lock") > .addProperty(CarbonCommonConstants.CARBON_MERGE_SORT_READER_THREAD, > s"${parallelism}") > > > .addProperty(CarbonCommonConstants.CARBON_INVISIBLE_SEGMENTS_PRESERVE_COUNT, > "100") > .addProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS, > s"${parallelism}") > .addProperty(CarbonCommonConstants.LOAD_SORT_SCOPE, "LOCAL_SORT") > .addProperty(CarbonCommonConstants.NUM_CORES_COMPACTING, > s"${parallelism}") > .addProperty(CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB, > "4096") > .addProperty(CarbonCommonConstants.NUM_CORES_LOADING, > s"${parallelism}") > .addProperty(CarbonCommonConstants.CARBON_MAJOR_COMPACTION_SIZE, > "1024") > .addProperty(CarbonCommonConstants.BLOCKLET_SIZE, "64") > //.addProperty(CarbonCommonConstants.TABLE_BLOCKLET_SIZE, "64") > > import org.apache.spark.sql.CarbonSession._ > > val carbon = SparkSession > .builder() > .master(masterUrl) > .appName(appName) > .config("spark.hadoop.fs.s3a.impl", > "org.apache.hadoop.fs.s3a.S3AFileSystem") > .config("spark.hadoop.dfs.replication", 1) > .config("spark.cores.max", s"${parallelism}") > .getOrCreateCarbonSession(storeLocation) > > carbon.sparkContext.hadoopConfiguration.setInt("dfs.replication", 1) > > carbon.sql(s"SET spark.default.parallelism=${parallelism}") > carbon.sql(s"SET spark.sql.shuffle.partitions=${parallelism}") > carbon.sql(s"SET spark.sql.cbo.enabled=true") > carbon.sql(s"SET carbon.options.bad.records.logger.enable=true") > > carbon.sparkContext.setLogLevel(logLevel) > carbon > } > > > > -- > Sent from: > http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/ >
