Repository: carbondata Updated Branches: refs/heads/master 05086e536 -> 877eabdd6
[CARBONDATA-2287] Events added for alter hive partition table Events added for alter hive partition table This closes #2107 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/877eabdd Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/877eabdd Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/877eabdd Branch: refs/heads/master Commit: 877eabdd6080c514e23a9cbfbaef9f78acc0d39f Parents: 05086e5 Author: rahulforallp <[email protected]> Authored: Tue Mar 27 12:20:04 2018 +0530 Committer: manishgupta88 <[email protected]> Committed: Wed Mar 28 19:54:39 2018 +0530 ---------------------------------------------------------------------- .../filesystem/AbstractDFSCarbonFile.java | 3 ++ .../carbondata/core/util/CarbonProperties.java | 50 ++++++++++---------- .../apache/carbondata/core/util/CarbonUtil.java | 24 ++++++++++ .../carbondata/events/AlterTableEvents.scala | 16 +++++++ .../org/apache/carbondata/events/Events.scala | 10 +++- .../carbondata/spark/rdd/StreamHandoffRDD.scala | 23 ++++----- ...arbonAlterTableAddHivePartitionCommand.scala | 14 +++++- ...rbonAlterTableDropHivePartitionCommand.scala | 12 +++++ .../CarbonAlterTableSplitPartitionCommand.scala | 12 +++++ .../impl/MeasureFieldConverterImpl.java | 5 +- .../store/CarbonFactDataHandlerModel.java | 30 ++++++++++-- 11 files changed, 156 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/877eabdd/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java index 8cf3efe..bf3292b 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java @@ -489,6 +489,9 @@ public abstract class AbstractDFSCarbonFile implements CarbonFile { fs.create(path, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL), false, fs.getConf().getInt("io.file.buffer.size", 4096), fs.getDefaultReplication(path), fs.getDefaultBlockSize(path), null).close(); + // haddop masks the permission accoding to configured permission, so need to set permission + // forcefully + fs.setPermission(path, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)); return true; } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/877eabdd/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java index 6fa21bc..38f7513 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java @@ -874,7 +874,7 @@ public final class CarbonProperties { CarbonCommonConstants.DEFAULT_PRESERVE_LATEST_SEGMENTS_NUMBER)); // checking min and max . 0 , 100 is min & max. if (numberOfSegmentsToBePreserved < 0 || numberOfSegmentsToBePreserved > 100) { - LOGGER.error("The specified value for property " + LOGGER.warn("The specified value for property " + CarbonCommonConstants.PRESERVE_LATEST_SEGMENTS_NUMBER + " is incorrect." + " Correct value should be in range of 0 -100. Taking the default value."); numberOfSegmentsToBePreserved = @@ -930,7 +930,7 @@ public final class CarbonProperties { } compactionSize[i++] = size; } catch (NumberFormatException e) { - LOGGER.error( + LOGGER.warn( "Given value for property" + CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD + " is not proper. Taking the default value " + CarbonCommonConstants.DEFAULT_SEGMENT_LEVEL_THRESHOLD); @@ -953,7 +953,7 @@ public final class CarbonProperties { CarbonCommonConstants.NUM_CORES_LOADING, CarbonCommonConstants.NUM_CORES_DEFAULT_VAL)); } catch (NumberFormatException exc) { - LOGGER.error("Configured value for property " + CarbonCommonConstants.NUM_CORES_LOADING + LOGGER.warn("Configured value for property " + CarbonCommonConstants.NUM_CORES_LOADING + " is wrong. Falling back to the default value " + CarbonCommonConstants.NUM_CORES_DEFAULT_VAL); numberOfCores = Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL); @@ -974,18 +974,18 @@ public final class CarbonProperties { } catch (Exception e) { inMemoryChunkSizeInMB = Integer.parseInt(CarbonCommonConstants.OFFHEAP_SORT_CHUNK_SIZE_IN_MB_DEFAULT); - LOGGER.error("Problem in parsing the sort memory chunk size, setting with default value" + LOGGER.warn("Problem in parsing the sort memory chunk size, setting with default value" + inMemoryChunkSizeInMB); } if (inMemoryChunkSizeInMB > 1024) { inMemoryChunkSizeInMB = 1024; - LOGGER.error( + LOGGER.warn( "It is not recommended to increase the sort memory chunk size more than 1024MB, " + "so setting the value to " + inMemoryChunkSizeInMB); } else if (inMemoryChunkSizeInMB < 1) { inMemoryChunkSizeInMB = 1; - LOGGER.error( + LOGGER.warn( "It is not recommended to decrease the sort memory chunk size less than 1MB, " + "so setting the value to " + inMemoryChunkSizeInMB); @@ -1071,7 +1071,7 @@ public final class CarbonProperties { CarbonCommonConstants.DEFAULT_UPDATE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION)); if (numberOfDeltaFilesThreshold < 0 || numberOfDeltaFilesThreshold > 10000) { - LOGGER.error("The specified value for property " + LOGGER.warn("The specified value for property " + CarbonCommonConstants.UPDATE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION + "is incorrect." + " Correct value should be in range of 0 -10000. Taking the default value."); @@ -1079,7 +1079,7 @@ public final class CarbonProperties { CarbonCommonConstants.DEFAULT_UPDATE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION); } } catch (NumberFormatException e) { - LOGGER.error("The specified value for property " + LOGGER.warn("The specified value for property " + CarbonCommonConstants.UPDATE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION + "is incorrect." + " Correct value should be in range of 0 -10000. Taking the default value."); numberOfDeltaFilesThreshold = Integer @@ -1101,7 +1101,7 @@ public final class CarbonProperties { CarbonCommonConstants.DEFAULT_DELETE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION)); if (numberOfDeltaFilesThreshold < 0 || numberOfDeltaFilesThreshold > 10000) { - LOGGER.error("The specified value for property " + LOGGER.warn("The specified value for property " + CarbonCommonConstants.DELETE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION + "is incorrect." + " Correct value should be in range of 0 -10000. Taking the default value."); @@ -1109,7 +1109,7 @@ public final class CarbonProperties { CarbonCommonConstants.DEFAULT_DELETE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION); } } catch (NumberFormatException e) { - LOGGER.error("The specified value for property " + LOGGER.warn("The specified value for property " + CarbonCommonConstants.DELETE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION + "is incorrect." + " Correct value should be in range of 0 -10000. Taking the default value."); numberOfDeltaFilesThreshold = Integer @@ -1127,7 +1127,7 @@ public final class CarbonProperties { CarbonCommonConstants.CARBON_USE_MULTI_TEMP_DIR_DEFAULT); boolean validateBoolean = CarbonUtil.validateBoolean(usingMultiDirStr); if (!validateBoolean) { - LOGGER.error("The carbon.use.multiple.temp.dir configuration value is invalid." + LOGGER.warn("The carbon.use.multiple.temp.dir configuration value is invalid." + "Configured value: \"" + usingMultiDirStr + "\"." + "Data Load will not use multiple temp directories."); usingMultiDirStr = CarbonCommonConstants.CARBON_USE_MULTI_TEMP_DIR_DEFAULT; @@ -1144,7 +1144,7 @@ public final class CarbonProperties { CarbonCommonConstants.CARBON_GLOBAL_SORT_RDD_STORAGE_LEVEL_DEFAULT); boolean validateStorageLevel = CarbonUtil.isValidStorageLevel(storageLevel); if (!validateStorageLevel) { - LOGGER.error("The " + CarbonCommonConstants.CARBON_GLOBAL_SORT_RDD_STORAGE_LEVEL + LOGGER.warn("The " + CarbonCommonConstants.CARBON_GLOBAL_SORT_RDD_STORAGE_LEVEL + " configuration value is invalid. It will use default storage level(" + CarbonCommonConstants.CARBON_GLOBAL_SORT_RDD_STORAGE_LEVEL_DEFAULT + ") to persist rdd."); @@ -1173,7 +1173,7 @@ public final class CarbonProperties { } if (isInvalidValue) { - LOGGER.error("The specified value for property " + LOGGER.warn("The specified value for property " + CarbonCommonConstants.CARBON_UPDATE_SEGMENT_PARALLELISM + " is incorrect. Correct value should be in range of 0 - 1000." + " Taking the default value: " @@ -1194,7 +1194,7 @@ public final class CarbonProperties { CarbonCommonConstants.defaultValueIsPersistEnabled); boolean validatePersistEnabled = CarbonUtil.validateBoolean(isPersistEnabled); if (!validatePersistEnabled) { - LOGGER.error("The " + CarbonCommonConstants.isPersistEnabled + LOGGER.warn("The " + CarbonCommonConstants.isPersistEnabled + " configuration value is invalid. It will use default value(" + CarbonCommonConstants.defaultValueIsPersistEnabled + ")."); @@ -1212,7 +1212,7 @@ public final class CarbonProperties { CarbonCommonConstants.CARBON_UPDATE_STORAGE_LEVEL_DEFAULT); boolean validateStorageLevel = CarbonUtil.isValidStorageLevel(storageLevel); if (!validateStorageLevel) { - LOGGER.error("The " + CarbonCommonConstants.CARBON_UPDATE_STORAGE_LEVEL + LOGGER.warn("The " + CarbonCommonConstants.CARBON_UPDATE_STORAGE_LEVEL + " configuration value is invalid. It will use default storage level(" + CarbonCommonConstants.CARBON_UPDATE_STORAGE_LEVEL_DEFAULT + ") to persist dataset."); @@ -1232,7 +1232,7 @@ public final class CarbonProperties { || "BZIP2".equals(compressor) || "LZ4".equals(compressor)) { return compressor; } else { - LOGGER.error("The ".concat(CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR) + LOGGER.warn("The ".concat(CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR) .concat(" configuration value is invalid. Only snappy,gzip,bip2,lz4 and") .concat(" empty are allowed. It will not compress the sort temp files by default")); return CarbonCommonConstants.CARBON_SORT_TEMP_COMPRESSOR_DEFAULT; @@ -1279,14 +1279,14 @@ public final class CarbonProperties { sortMemorySizeInMB = Integer.parseInt( carbonProperties.getProperty(CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB)); } catch (NumberFormatException e) { - LOGGER.error( + LOGGER.warn( "The specified value for property " + CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB + "is Invalid." + " Taking the default value." + CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB_DEFAULT); sortMemorySizeInMB = sortMemorySizeInMBDefault; } if (sortMemorySizeInMB < sortMemorySizeInMBDefault) { - LOGGER.error( + LOGGER.warn( "The specified value for property " + CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB + "is less than default value." + ". Taking the default value." + CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB_DEFAULT); @@ -1324,14 +1324,14 @@ public final class CarbonProperties { unsafeWorkingMemory = Integer.parseInt( carbonProperties.getProperty(CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB)); } catch (NumberFormatException e) { - LOGGER.error("The specified value for property " + LOGGER.warn("The specified value for property " + CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB_DEFAULT + "is invalid." + " Taking the default value." + CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB_DEFAULT); unsafeWorkingMemory = unsafeWorkingMemoryDefault; } if (unsafeWorkingMemory < unsafeWorkingMemoryDefault) { - LOGGER.error("The specified value for property " + LOGGER.warn("The specified value for property " + CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB_DEFAULT + "is less than the default value." + ". Taking the default value." + CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB_DEFAULT); @@ -1349,14 +1349,14 @@ public final class CarbonProperties { unsafeSortStorageMemory = Integer.parseInt(carbonProperties .getProperty(CarbonCommonConstants.IN_MEMORY_STORAGE_FOR_SORTED_DATA_IN_MB)); } catch (NumberFormatException e) { - LOGGER.error("The specified value for property " + LOGGER.warn("The specified value for property " + CarbonCommonConstants.IN_MEMORY_STORAGE_FOR_SORTED_DATA_IN_MB + "is invalid." + " Taking the default value." + CarbonCommonConstants.IN_MEMORY_STORAGE_FOR_SORTED_DATA_IN_MB_DEFAULT); unsafeSortStorageMemory = unsafeSortStorageMemoryDefault; } if (unsafeSortStorageMemory < unsafeSortStorageMemoryDefault) { - LOGGER.error("The specified value for property " + LOGGER.warn("The specified value for property " + CarbonCommonConstants.IN_MEMORY_STORAGE_FOR_SORTED_DATA_IN_MB + "is less than the default value." + " Taking the default value." + CarbonCommonConstants.IN_MEMORY_STORAGE_FOR_SORTED_DATA_IN_MB_DEFAULT); @@ -1397,7 +1397,7 @@ public final class CarbonProperties { .getProperty(CarbonCommonConstants.CARBON_HEAP_MEMORY_POOLING_THRESHOLD_BYTES, CarbonCommonConstants.CARBON_HEAP_MEMORY_POOLING_THRESHOLD_BYTES_DEFAULT)); } catch (NumberFormatException exc) { - LOGGER.error( + LOGGER.warn( "The heap memory pooling threshold bytes is invalid. Using the default value " + CarbonCommonConstants.CARBON_HEAP_MEMORY_POOLING_THRESHOLD_BYTES_DEFAULT); thresholdSize = Integer.parseInt( @@ -1418,7 +1418,7 @@ public final class CarbonProperties { CarbonCommonConstants.CARBON_SEGMENT_LOCK_FILES_PRESERVE_HOURS_DEFAULT)); preserveSeconds = preserveHours * 3600 * 1000L; } catch (NumberFormatException exc) { - LOGGER.error( + LOGGER.warn( "The value of '" + CarbonCommonConstants.CARBON_SEGMENT_LOCK_FILES_PRESERVE_HOURS + "' is invalid. Using the default value " + CarbonCommonConstants.CARBON_SEGMENT_LOCK_FILES_PRESERVE_HOURS_DEFAULT); @@ -1438,7 +1438,7 @@ public final class CarbonProperties { .getProperty(CarbonCommonConstants.CARBON_INVISIBLE_SEGMENTS_PRESERVE_COUNT, CarbonCommonConstants.CARBON_INVISIBLE_SEGMENTS_PRESERVE_COUNT_DEFAULT)); } catch (NumberFormatException exc) { - LOGGER.error( + LOGGER.warn( "The value of '" + CarbonCommonConstants.CARBON_INVISIBLE_SEGMENTS_PRESERVE_COUNT + "' is invalid. Using the default value " + CarbonCommonConstants.CARBON_INVISIBLE_SEGMENTS_PRESERVE_COUNT_DEFAULT); http://git-wip-us.apache.org/repos/asf/carbondata/blob/877eabdd/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java index 1082d78..3c347db 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java @@ -99,6 +99,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.security.UserGroupInformation; import org.apache.thrift.TBase; @@ -846,6 +848,28 @@ public final class CarbonUtil { } /** + * + * This method will check and create the given path with 777 permission + */ + public static boolean checkAndCreateFolderWithPermission(String path) { + boolean created = false; + try { + FileFactory.FileType fileType = FileFactory.getFileType(path); + if (FileFactory.isFileExist(path, fileType)) { + created = true; + } else { + FileFactory.createDirectoryAndSetPermission(path, + new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)); + created = true; + } + } catch (IOException e) { + LOGGER.error(e); + } + return created; + } + + + /** * This method will return the size of a given file */ public static long getFileSize(String filePath) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/877eabdd/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala index 538df4a..7c4339f 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala @@ -215,3 +215,19 @@ case class AlterTableCompactionAbortEvent(sparkSession: SparkSession, case class AlterTableCompactionExceptionEvent(sparkSession: SparkSession, carbonTable: CarbonTable, alterTableModel: AlterTableModel) extends Event with AlterTableCompactionEventInfo + +/** + * pre event for standard hive partition + * @param sparkSession + * @param carbonTable + */ +case class PreAlterTableHivePartitionCommandEvent(sparkSession: SparkSession, + carbonTable: CarbonTable) extends Event with AlterTableHivePartitionInfo + +/** + * post event for standard hive partition + * @param sparkSession + * @param carbonTable + */ +case class PostAlterTableHivePartitionCommandEvent(sparkSession: SparkSession, + carbonTable: CarbonTable) extends Event with AlterTableHivePartitionInfo http://git-wip-us.apache.org/repos/asf/carbondata/blob/877eabdd/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala index 799d8c4..d85e8ae 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala @@ -100,7 +100,7 @@ trait AlterTableCompactionStatusUpdateEventInfo { } /** - * event for alter_table_compaction + * event info for alter_table_compaction */ trait AlterTableCompactionEventInfo { val sparkSession: SparkSession @@ -108,6 +108,14 @@ trait AlterTableCompactionEventInfo { } /** + * event for alter table standard hive partition + */ +trait AlterTableHivePartitionInfo { + val sparkSession: SparkSession + val carbonTable: CarbonTable +} + +/** * event for DeleteSegmentById */ trait DeleteSegmentbyIdEventInfo { http://git-wip-us.apache.org/repos/asf/carbondata/blob/877eabdd/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala index bf5f660..50102f1 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala @@ -311,18 +311,7 @@ object StreamHandoffRDD { SegmentStatus.INSERT_IN_PROGRESS, carbonLoadModel.getFactTimeStamp, false) - val operationContext = new OperationContext() - val loadTablePreStatusUpdateEvent: LoadTablePreStatusUpdateEvent = - new LoadTablePreStatusUpdateEvent( - carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getCarbonTableIdentifier, - carbonLoadModel) - OperationListenerBus.getInstance().fireEvent(loadTablePreStatusUpdateEvent, operationContext) - CarbonLoaderUtil.recordNewLoadMetadata(newMetaEntry, carbonLoadModel, true, false) - val loadTablePostStatusUpdateEvent: LoadTablePostStatusUpdateEvent = - new LoadTablePostStatusUpdateEvent(carbonLoadModel) - OperationListenerBus.getInstance() - .fireEvent(loadTablePostStatusUpdateEvent, operationContext) // convert a streaming segment to columnar segment val status = new StreamHandoffRDD( sparkSession.sparkContext, @@ -355,7 +344,19 @@ object StreamHandoffRDD { } if (loadStatus == SegmentStatus.SUCCESS) { + val operationContext = new OperationContext() + val loadTablePreStatusUpdateEvent: LoadTablePreStatusUpdateEvent = + new LoadTablePreStatusUpdateEvent( + carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getCarbonTableIdentifier, + carbonLoadModel) + OperationListenerBus.getInstance().fireEvent(loadTablePreStatusUpdateEvent, operationContext) + val done = updateLoadMetadata(handoffSegmenId, carbonLoadModel) + + val loadTablePostStatusUpdateEvent: LoadTablePostStatusUpdateEvent = + new LoadTablePostStatusUpdateEvent(carbonLoadModel) + OperationListenerBus.getInstance() + .fireEvent(loadTablePostStatusUpdateEvent, operationContext) if (!done) { val errorMessage = "Handoff failed due to failure in table status updation." LOGGER.audit("Handoff is failed for " + http://git-wip-us.apache.org/repos/asf/carbondata/blob/877eabdd/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala index b0e6b94..b583c6a 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala @@ -35,6 +35,7 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.statusmanager.SegmentStatus import org.apache.carbondata.core.util.CarbonUtil import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.events.{OperationContext, OperationListenerBus, PostAlterTableHivePartitionCommandEvent, PreAlterTableHivePartitionCommandEvent} import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} import org.apache.carbondata.processing.util.CarbonLoaderUtil @@ -68,7 +69,18 @@ case class CarbonAlterTableAddHivePartitionCommand( currParts.exists(p => part.equals(p)) }.asJava) } + val operationContext = new OperationContext + val preAlterTableHivePartitionCommandEvent = new PreAlterTableHivePartitionCommandEvent( + sparkSession, + table) + OperationListenerBus.getInstance() + .fireEvent(preAlterTableHivePartitionCommandEvent, operationContext) AlterTableAddPartitionCommand(tableName, partitionSpecsAndLocs, ifNotExists).run(sparkSession) + val postAlterTableHivePartitionCommandEvent = PostAlterTableHivePartitionCommandEvent( + sparkSession, + table) + OperationListenerBus.getInstance() + .fireEvent(postAlterTableHivePartitionCommandEvent, operationContext) } Seq.empty[Row] } @@ -113,7 +125,7 @@ case class CarbonAlterTableAddHivePartitionCommand( loadModel.getSegmentId + "_" + loadModel.getFactTimeStamp + CarbonTablePath.SEGMENT_EXT newMetaEntry.setSegmentFile(segmentFileName) val segmentsLoc = CarbonTablePath.getSegmentFilesLocation(table.getTablePath) - CarbonUtil.checkAndCreateFolder(segmentsLoc) + CarbonUtil.checkAndCreateFolderWithPermission(segmentsLoc) val segmentPath = segmentsLoc + CarbonCommonConstants.FILE_SEPARATOR + segmentFileName SegmentFileStore.writeSegmentFile(segmentFile, segmentPath) CarbonLoaderUtil.populateNewLoadMetaEntry( http://git-wip-us.apache.org/repos/asf/carbondata/blob/877eabdd/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala index 407057e..c67d694 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala @@ -35,6 +35,7 @@ import org.apache.carbondata.core.metadata.SegmentFileStore import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.statusmanager.SegmentStatusManager import org.apache.carbondata.core.util.CarbonUtil +import org.apache.carbondata.events.{OperationContext, OperationListenerBus, PostAlterTableHivePartitionCommandEvent, PreAlterTableHivePartitionCommandEvent} import org.apache.carbondata.spark.rdd.CarbonDropPartitionRDD /** @@ -89,6 +90,12 @@ case class CarbonAlterTableDropHivePartitionCommand( partition.location) } carbonPartitionsTobeDropped = new util.ArrayList[PartitionSpec](carbonPartitions.asJava) + val operationContext = new OperationContext + val preAlterTableHivePartitionCommandEvent = PreAlterTableHivePartitionCommandEvent( + sparkSession, + table) + OperationListenerBus.getInstance() + .fireEvent(preAlterTableHivePartitionCommandEvent, operationContext) // Drop the partitions from hive. AlterTableDropPartitionCommand( tableName, @@ -96,6 +103,11 @@ case class CarbonAlterTableDropHivePartitionCommand( ifExists, purge, retainData).run(sparkSession) + val postAlterTableHivePartitionCommandEvent = PostAlterTableHivePartitionCommandEvent( + sparkSession, + table) + OperationListenerBus.getInstance() + .fireEvent(postAlterTableHivePartitionCommandEvent, operationContext) } catch { case e: Exception => if (!ifExists) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/877eabdd/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala index 4b89296..1bdf414 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala @@ -41,6 +41,7 @@ import org.apache.carbondata.core.metadata.schema.partition.PartitionType import org.apache.carbondata.core.mutate.CarbonUpdateUtil import org.apache.carbondata.core.statusmanager.SegmentStatusManager import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.events.{OperationContext, OperationListenerBus, PostAlterTableHivePartitionCommandEvent, PreAlterTableHivePartitionCommandEvent} import org.apache.carbondata.processing.loading.TableProcessingOperations import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} import org.apache.carbondata.spark.partition.SplitPartitionCallable @@ -151,12 +152,23 @@ case class CarbonAlterTableSplitPartitionCommand( carbonLoadModel.setTablePath(tablePath) val loadStartTime = CarbonUpdateUtil.readCurrentTime carbonLoadModel.setFactTimeStamp(loadStartTime) + val operationContext = new OperationContext + val preAlterTableHivePartitionCommandEvent = PreAlterTableHivePartitionCommandEvent( + sparkSession, + table) + OperationListenerBus.getInstance() + .fireEvent(preAlterTableHivePartitionCommandEvent, operationContext) alterTableSplitPartition( sparkSession.sqlContext, splitPartitionModel.partitionId.toInt.toString, carbonLoadModel, oldPartitionIds.asScala.toList ) + val postAlterTableHivePartitionCommandEvent = PostAlterTableHivePartitionCommandEvent( + sparkSession, + table) + OperationListenerBus.getInstance() + .fireEvent(postAlterTableHivePartitionCommandEvent, operationContext) success = true } catch { case e: Exception => http://git-wip-us.apache.org/repos/asf/carbondata/blob/877eabdd/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java index 6664a2c..724a312 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java @@ -95,8 +95,9 @@ public class MeasureFieldConverterImpl implements FieldConverter { } row.update(output, index); } catch (NumberFormatException e) { - LOGGER.warn( - "Cant not convert value to Numeric type value. Value considered as null."); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Can not convert value to Numeric type value. Value considered as null."); + } logHolder.setReason( CarbonDataProcessorUtil.prepareFailureReason(measure.getColName(), dataType)); output = null; http://git-wip-us.apache.org/repos/asf/carbondata/blob/877eabdd/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java index 8aa5bde..1d892e0 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Map; import java.util.Random; +import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.TableSpec; import org.apache.carbondata.core.datastore.block.SegmentProperties; import org.apache.carbondata.core.keygenerator.KeyGenerator; @@ -308,7 +309,7 @@ public class CarbonFactDataHandlerModel { measureDataTypes[i++] = msr.getDataType(); } carbonFactDataHandlerModel.setMeasureDataType(measureDataTypes); - CarbonUtil.checkAndCreateFolder(carbonDataDirectoryPath); + CarbonUtil.checkAndCreateFolderWithPermission(carbonDataDirectoryPath); carbonFactDataHandlerModel.setCarbonDataDirectoryPath(carbonDataDirectoryPath); List<CarbonDimension> dimensionByTableName = carbonTable.getDimensionByTableName(tableName); boolean[] isUseInvertedIndexes = new boolean[dimensionByTableName.size()]; @@ -337,9 +338,32 @@ public class CarbonFactDataHandlerModel { * @return data directory path */ private static String getCarbonDataFolderLocation(CarbonDataLoadConfiguration configuration) { + // configuration.getDataWritePath will not be null only in case of partition if (configuration.getDataWritePath() != null) { - CarbonUtil.checkAndCreateFolder(configuration.getDataWritePath()); - return configuration.getDataWritePath(); + String paths = configuration.getDataWritePath(); + AbsoluteTableIdentifier absoluteTableIdentifier = configuration.getTableIdentifier(); + String partPath = absoluteTableIdentifier.getTablePath(); + String[] dirs = paths.split(partPath); + /* it will create folder one by one and apply the permissions + else creation of folder in one go will set the permission for last directory only + e.g. paths="/home/rahul/Documents/store/carbonTable1/emp_name=rahul/loc=india/dept=rd" + So, dirs={"","/emp_name=rahul/loc=india/dept=rd"} + if (dirs.length > 1) then partDirs ={"","emp_name=rahul","loc=india","dept=rd"} + forEach partDirs partpath(say "/home/rahul/Documents/store/carbonTable1") will + be keep appending with "emp_name=rahul","loc=india","dept=rd" sequentially + */ + if (dirs.length > 1) { + String[] partDirs = dirs[1].split(CarbonCommonConstants.FILE_SEPARATOR); + for (String partDir : partDirs) { + if (!partDir.isEmpty()) { + partPath = partPath.concat(CarbonCommonConstants.FILE_SEPARATOR + partDir); + CarbonUtil.checkAndCreateFolderWithPermission(partPath); + } + } + } else { + CarbonUtil.checkAndCreateFolderWithPermission(paths); + } + return paths; } AbsoluteTableIdentifier absoluteTableIdentifier = configuration.getTableIdentifier(); String carbonDataDirectoryPath = CarbonTablePath
