kunal642 commented on a change in pull request #3908: URL: https://github.com/apache/carbondata/pull/3908#discussion_r486993964
########## File path: integration/spark/src/main/scala/org/apache/spark/util/PartitionCacheManger.scala ########## @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +import java.net.URI +import java.util + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.log4j.Logger +import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTablePartition} + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.cache.{Cache, Cacheable, CarbonLRUCache} +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.index.Segment +import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, SegmentFileStore} +import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager} +import org.apache.carbondata.core.util.path.CarbonTablePath + +object PartitionCacheManager extends Cache[PartitionCacheKey, + java.util.List[CatalogTablePartition]] { + + private val CACHE = new CarbonLRUCache( + CarbonCommonConstants.CARBON_PARTITION_MAX_DRIVER_LRU_CACHE_SIZE, + CarbonCommonConstants.CARBON_MAX_LRU_CACHE_SIZE_DEFAULT) + + val LOGGER: Logger = LogServiceFactory.getLogService(this.getClass.getName) + + def get(identifier: PartitionCacheKey): java.util.List[CatalogTablePartition] = { + LOGGER.info("Reading partition values from store") + // read the tableStatus file to get valid and invalid segments + val validInvalidSegments = new SegmentStatusManager(AbsoluteTableIdentifier.from( + identifier.tablePath, null, null, identifier.tableId)) + .getValidAndInvalidSegments + val cacheablePartitionSpecs = validInvalidSegments.getValidSegments.asScala.map { segment => + val segmentFileName = segment.getSegmentFileName + val segmentFilePath = FileFactory.getCarbonFile( + CarbonTablePath.getSegmentFilePath(identifier.tablePath, segmentFileName)) + // read the last modified time + val segmentFileModifiedTime = segmentFilePath.getLastModifiedTime + val existingCache = CACHE.get(identifier.tableId) + if (existingCache != null) { + val segmentCache = CACHE.get(identifier.tableId).asInstanceOf[CacheablePartitionSpec] + .partitionSpecs.get(segment.getSegmentNo) + segmentCache match { + case Some(c) => + // check if cache needs to be updated + if (segmentFileModifiedTime > c._2) { + (segment.getSegmentNo, (readPartition(identifier, + segmentFilePath.getAbsolutePath), segmentFileModifiedTime)) + } else { + (segment.getSegmentNo, c) + } + case None => + (segment.getSegmentNo, (readPartition(identifier, + segmentFilePath.getAbsolutePath), segmentFileModifiedTime)) + } + } else { + // read the partitions if not available in cache. + (segment.getSegmentNo, (readPartition(identifier, + segmentFilePath.getAbsolutePath), segmentFileModifiedTime)) + } + }.toMap + // remove all invalid segment entries from cache + val finalCache = cacheablePartitionSpecs -- + validInvalidSegments.getInvalidSegments.asScala.map(_.getSegmentNo) + val cacheObject = CacheablePartitionSpec(finalCache) + if (finalCache.nonEmpty) { Review comment: Can be empty during first load. in that case empty partitions would be stored in the cache ########## File path: integration/spark/src/main/scala/org/apache/spark/util/PartitionCacheManager.scala ########## @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +import java.net.URI +import java.util + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.log4j.Logger +import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTablePartition} + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.cache.{Cache, Cacheable, CarbonLRUCache} +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.index.Segment +import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, SegmentFileStore} +import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager} +import org.apache.carbondata.core.util.path.CarbonTablePath + +object PartitionCacheManager extends Cache[PartitionCacheKey, + java.util.List[CatalogTablePartition]] { + + private val CACHE = new CarbonLRUCache( + CarbonCommonConstants.CARBON_PARTITION_MAX_DRIVER_LRU_CACHE_SIZE, + CarbonCommonConstants.CARBON_MAX_LRU_CACHE_SIZE_DEFAULT) + + val LOGGER: Logger = LogServiceFactory.getLogService(this.getClass.getName) + + def get(identifier: PartitionCacheKey): java.util.List[CatalogTablePartition] = { + LOGGER.info("Reading partition values from store") + // read the tableStatus file to get valid and invalid segments + val validInvalidSegments = new SegmentStatusManager(AbsoluteTableIdentifier.from( + identifier.tablePath, null, null, identifier.tableId)) + .getValidAndInvalidSegments + val cacheablePartitionSpecs = validInvalidSegments.getValidSegments.asScala.map { segment => + val segmentFileName = segment.getSegmentFileName + val segmentFilePath = FileFactory.getCarbonFile( + CarbonTablePath.getSegmentFilePath(identifier.tablePath, segmentFileName)) + // read the last modified time + val segmentFileModifiedTime = segmentFilePath.getLastModifiedTime + val existingCache = CACHE.get(identifier.tableId) + if (existingCache != null) { + val segmentCache = CACHE.get(identifier.tableId).asInstanceOf[CacheablePartitionSpec] + .partitionSpecs.get(segment.getSegmentNo) + segmentCache match { + case Some(c) => + // check if cache needs to be updated + if (segmentFileModifiedTime > c._2) { + (segment.getSegmentNo, (readPartition(identifier, + segmentFilePath.getAbsolutePath), segmentFileModifiedTime)) + } else { + (segment.getSegmentNo, c) + } + case None => + (segment.getSegmentNo, (readPartition(identifier, + segmentFilePath.getAbsolutePath), segmentFileModifiedTime)) + } + } else { + // read the partitions if not available in cache. + (segment.getSegmentNo, (readPartition(identifier, + segmentFilePath.getAbsolutePath), segmentFileModifiedTime)) + } + }.toMap + // remove all invalid segment entries from cache + val finalCache = cacheablePartitionSpecs -- + validInvalidSegments.getInvalidSegments.asScala.map(_.getSegmentNo) + val cacheObject = CacheablePartitionSpec(finalCache) + if (finalCache.nonEmpty) { Review comment: this check is to avoid caching empty cache map ########## File path: integration/spark/src/main/scala/org/apache/spark/util/PartitionCacheManager.scala ########## @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +import java.net.URI +import java.util + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.log4j.Logger +import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTablePartition} + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.cache.{Cache, Cacheable, CarbonLRUCache} +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.index.Segment +import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, SegmentFileStore} +import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager} +import org.apache.carbondata.core.util.path.CarbonTablePath + +object PartitionCacheManager extends Cache[PartitionCacheKey, + java.util.List[CatalogTablePartition]] { + + private val CACHE = new CarbonLRUCache( + CarbonCommonConstants.CARBON_PARTITION_MAX_DRIVER_LRU_CACHE_SIZE, + CarbonCommonConstants.CARBON_MAX_LRU_CACHE_SIZE_DEFAULT) + + val LOGGER: Logger = LogServiceFactory.getLogService(this.getClass.getName) + + def get(identifier: PartitionCacheKey): java.util.List[CatalogTablePartition] = { + LOGGER.info("Reading partition values from store") + // read the tableStatus file to get valid and invalid segments + val validInvalidSegments = new SegmentStatusManager(AbsoluteTableIdentifier.from( + identifier.tablePath, null, null, identifier.tableId)) + .getValidAndInvalidSegments + val cacheablePartitionSpecs = validInvalidSegments.getValidSegments.asScala.map { segment => + val segmentFileName = segment.getSegmentFileName + val segmentFilePath = FileFactory.getCarbonFile( + CarbonTablePath.getSegmentFilePath(identifier.tablePath, segmentFileName)) + // read the last modified time + val segmentFileModifiedTime = segmentFilePath.getLastModifiedTime + val existingCache = CACHE.get(identifier.tableId) + if (existingCache != null) { + val segmentCache = CACHE.get(identifier.tableId).asInstanceOf[CacheablePartitionSpec] + .partitionSpecs.get(segment.getSegmentNo) + segmentCache match { + case Some(c) => + // check if cache needs to be updated + if (segmentFileModifiedTime > c._2) { + (segment.getSegmentNo, (readPartition(identifier, + segmentFilePath.getAbsolutePath), segmentFileModifiedTime)) + } else { + (segment.getSegmentNo, c) + } + case None => + (segment.getSegmentNo, (readPartition(identifier, + segmentFilePath.getAbsolutePath), segmentFileModifiedTime)) + } + } else { + // read the partitions if not available in cache. + (segment.getSegmentNo, (readPartition(identifier, Review comment: @ajantha-bhat This is disabled by default, so if the user feels that hive metastore getPartitions is slow, can enable this to improve performance. ########## File path: integration/spark/src/main/scala/org/apache/spark/util/PartitionCacheManager.scala ########## @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +import java.net.URI +import java.util + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.log4j.Logger +import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTablePartition} + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.cache.{Cache, Cacheable, CarbonLRUCache} +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.index.Segment +import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, SegmentFileStore} +import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager} +import org.apache.carbondata.core.util.path.CarbonTablePath + +object PartitionCacheManager extends Cache[PartitionCacheKey, + java.util.List[CatalogTablePartition]] { + + private val CACHE = new CarbonLRUCache( + CarbonCommonConstants.CARBON_PARTITION_MAX_DRIVER_LRU_CACHE_SIZE, + CarbonCommonConstants.CARBON_MAX_LRU_CACHE_SIZE_DEFAULT) + + val LOGGER: Logger = LogServiceFactory.getLogService(this.getClass.getName) + + def get(identifier: PartitionCacheKey): java.util.List[CatalogTablePartition] = { + LOGGER.info("Reading partition values from store") + // read the tableStatus file to get valid and invalid segments + val validInvalidSegments = new SegmentStatusManager(AbsoluteTableIdentifier.from( + identifier.tablePath, null, null, identifier.tableId)) + .getValidAndInvalidSegments + val cacheablePartitionSpecs = validInvalidSegments.getValidSegments.asScala.map { segment => + val segmentFileName = segment.getSegmentFileName + val segmentFilePath = FileFactory.getCarbonFile( + CarbonTablePath.getSegmentFilePath(identifier.tablePath, segmentFileName)) + // read the last modified time + val segmentFileModifiedTime = segmentFilePath.getLastModifiedTime + val existingCache = CACHE.get(identifier.tableId) + if (existingCache != null) { + val segmentCache = CACHE.get(identifier.tableId).asInstanceOf[CacheablePartitionSpec] + .partitionSpecs.get(segment.getSegmentNo) + segmentCache match { + case Some(c) => + // check if cache needs to be updated + if (segmentFileModifiedTime > c._2) { Review comment: TableSegmentRefresher is in core module, cannot add there ########## File path: integration/spark/src/main/scala/org/apache/spark/util/PartitionCacheManger.scala ########## @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +import java.net.URI +import java.util + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.log4j.Logger +import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTablePartition} + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.cache.{Cache, Cacheable, CarbonLRUCache} +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.index.Segment +import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, SegmentFileStore} +import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager} +import org.apache.carbondata.core.util.path.CarbonTablePath + +object PartitionCacheManager extends Cache[PartitionCacheKey, + java.util.List[CatalogTablePartition]] { + + private val CACHE = new CarbonLRUCache( + CarbonCommonConstants.CARBON_PARTITION_MAX_DRIVER_LRU_CACHE_SIZE, + CarbonCommonConstants.CARBON_MAX_LRU_CACHE_SIZE_DEFAULT) + + val LOGGER: Logger = LogServiceFactory.getLogService(this.getClass.getName) + + def get(identifier: PartitionCacheKey): java.util.List[CatalogTablePartition] = { + LOGGER.info("Reading partition values from store") + // read the tableStatus file to get valid and invalid segments + val validInvalidSegments = new SegmentStatusManager(AbsoluteTableIdentifier.from( + identifier.tablePath, null, null, identifier.tableId)) + .getValidAndInvalidSegments + val cacheablePartitionSpecs = validInvalidSegments.getValidSegments.asScala.map { segment => + val segmentFileName = segment.getSegmentFileName + val segmentFilePath = FileFactory.getCarbonFile( + CarbonTablePath.getSegmentFilePath(identifier.tablePath, segmentFileName)) + // read the last modified time + val segmentFileModifiedTime = segmentFilePath.getLastModifiedTime + val existingCache = CACHE.get(identifier.tableId) + if (existingCache != null) { + val segmentCache = CACHE.get(identifier.tableId).asInstanceOf[CacheablePartitionSpec] + .partitionSpecs.get(segment.getSegmentNo) + segmentCache match { + case Some(c) => + // check if cache needs to be updated + if (segmentFileModifiedTime > c._2) { + (segment.getSegmentNo, (readPartition(identifier, + segmentFilePath.getAbsolutePath), segmentFileModifiedTime)) + } else { + (segment.getSegmentNo, c) + } + case None => + (segment.getSegmentNo, (readPartition(identifier, + segmentFilePath.getAbsolutePath), segmentFileModifiedTime)) + } + } else { + // read the partitions if not available in cache. + (segment.getSegmentNo, (readPartition(identifier, + segmentFilePath.getAbsolutePath), segmentFileModifiedTime)) + } + }.toMap + // remove all invalid segment entries from cache + val finalCache = cacheablePartitionSpecs -- + validInvalidSegments.getInvalidSegments.asScala.map(_.getSegmentNo) Review comment: this is to remove entry of any invalid segment(like deleted segment) from the cacheablePartitionSpecs. As cacheablePartitionSpecs is a map of (segmentNo -> PartitionSpecs), therefore i am removing all the invalid segments from the final cache. Does this address your comment? ########## File path: integration/spark/src/main/scala/org/apache/spark/util/PartitionCacheManger.scala ########## @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +import java.net.URI +import java.util + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.log4j.Logger +import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTablePartition} + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.cache.{Cache, Cacheable, CarbonLRUCache} +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.index.Segment +import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, SegmentFileStore} +import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager} +import org.apache.carbondata.core.util.path.CarbonTablePath + +object PartitionCacheManager extends Cache[PartitionCacheKey, + java.util.List[CatalogTablePartition]] { + + private val CACHE = new CarbonLRUCache( + CarbonCommonConstants.CARBON_PARTITION_MAX_DRIVER_LRU_CACHE_SIZE, + CarbonCommonConstants.CARBON_MAX_LRU_CACHE_SIZE_DEFAULT) + + val LOGGER: Logger = LogServiceFactory.getLogService(this.getClass.getName) + + def get(identifier: PartitionCacheKey): java.util.List[CatalogTablePartition] = { + LOGGER.info("Reading partition values from store") + // read the tableStatus file to get valid and invalid segments + val validInvalidSegments = new SegmentStatusManager(AbsoluteTableIdentifier.from( + identifier.tablePath, null, null, identifier.tableId)) + .getValidAndInvalidSegments + val cacheablePartitionSpecs = validInvalidSegments.getValidSegments.asScala.map { segment => + val segmentFileName = segment.getSegmentFileName + val segmentFilePath = FileFactory.getCarbonFile( + CarbonTablePath.getSegmentFilePath(identifier.tablePath, segmentFileName)) + // read the last modified time + val segmentFileModifiedTime = segmentFilePath.getLastModifiedTime + val existingCache = CACHE.get(identifier.tableId) Review comment: done ########## File path: integration/spark/src/main/scala/org/apache/spark/util/PartitionCacheManger.scala ########## @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +import java.net.URI +import java.util + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.log4j.Logger +import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTablePartition} + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.cache.{Cache, Cacheable, CarbonLRUCache} +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.index.Segment +import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, SegmentFileStore} +import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager} +import org.apache.carbondata.core.util.path.CarbonTablePath + +object PartitionCacheManager extends Cache[PartitionCacheKey, + java.util.List[CatalogTablePartition]] { + + private val CACHE = new CarbonLRUCache( + CarbonCommonConstants.CARBON_PARTITION_MAX_DRIVER_LRU_CACHE_SIZE, + CarbonCommonConstants.CARBON_MAX_LRU_CACHE_SIZE_DEFAULT) + + val LOGGER: Logger = LogServiceFactory.getLogService(this.getClass.getName) + + def get(identifier: PartitionCacheKey): java.util.List[CatalogTablePartition] = { + LOGGER.info("Reading partition values from store") + // read the tableStatus file to get valid and invalid segments + val validInvalidSegments = new SegmentStatusManager(AbsoluteTableIdentifier.from( + identifier.tablePath, null, null, identifier.tableId)) + .getValidAndInvalidSegments + val cacheablePartitionSpecs = validInvalidSegments.getValidSegments.asScala.map { segment => + val segmentFileName = segment.getSegmentFileName + val segmentFilePath = FileFactory.getCarbonFile( + CarbonTablePath.getSegmentFilePath(identifier.tablePath, segmentFileName)) + // read the last modified time + val segmentFileModifiedTime = segmentFilePath.getLastModifiedTime + val existingCache = CACHE.get(identifier.tableId) + if (existingCache != null) { + val segmentCache = CACHE.get(identifier.tableId).asInstanceOf[CacheablePartitionSpec] Review comment: done ########## File path: integration/spark/src/main/scala/org/apache/spark/util/PartitionCacheManager.scala ########## @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +import java.net.URI +import java.util + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.log4j.Logger +import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTablePartition} + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.cache.{Cache, Cacheable, CarbonLRUCache} +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.index.Segment +import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, SegmentFileStore} +import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager} +import org.apache.carbondata.core.util.path.CarbonTablePath + +object PartitionCacheManager extends Cache[PartitionCacheKey, + java.util.List[CatalogTablePartition]] { + + private val CACHE = new CarbonLRUCache( + CarbonCommonConstants.CARBON_PARTITION_MAX_DRIVER_LRU_CACHE_SIZE, + CarbonCommonConstants.CARBON_MAX_LRU_CACHE_SIZE_DEFAULT) + + val LOGGER: Logger = LogServiceFactory.getLogService(this.getClass.getName) + + def get(identifier: PartitionCacheKey): java.util.List[CatalogTablePartition] = { + LOGGER.info("Reading partition values from store") + // read the tableStatus file to get valid and invalid segments + val validInvalidSegments = new SegmentStatusManager(AbsoluteTableIdentifier.from( + identifier.tablePath, null, null, identifier.tableId)) + .getValidAndInvalidSegments + val cacheablePartitionSpecs = validInvalidSegments.getValidSegments.asScala.map { segment => + val segmentFileName = segment.getSegmentFileName + val segmentFilePath = FileFactory.getCarbonFile( + CarbonTablePath.getSegmentFilePath(identifier.tablePath, segmentFileName)) + // read the last modified time + val segmentFileModifiedTime = segmentFilePath.getLastModifiedTime + val existingCache = CACHE.get(identifier.tableId) + if (existingCache != null) { + val segmentCache = CACHE.get(identifier.tableId).asInstanceOf[CacheablePartitionSpec] + .partitionSpecs.get(segment.getSegmentNo) + segmentCache match { + case Some(c) => + // check if cache needs to be updated + if (segmentFileModifiedTime > c._2) { + (segment.getSegmentNo, (readPartition(identifier, + segmentFilePath.getAbsolutePath), segmentFileModifiedTime)) + } else { + (segment.getSegmentNo, c) + } + case None => + (segment.getSegmentNo, (readPartition(identifier, + segmentFilePath.getAbsolutePath), segmentFileModifiedTime)) + } + } else { + // read the partitions if not available in cache. + (segment.getSegmentNo, (readPartition(identifier, Review comment: @ajantha-bhat ########## File path: integration/spark/src/main/scala/org/apache/spark/util/PartitionCacheManager.scala ########## @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +import java.net.URI +import java.util + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.log4j.Logger +import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTablePartition} + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.cache.{Cache, Cacheable, CarbonLRUCache} +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.index.Segment +import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, SegmentFileStore} +import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager} +import org.apache.carbondata.core.util.path.CarbonTablePath + +object PartitionCacheManager extends Cache[PartitionCacheKey, + java.util.List[CatalogTablePartition]] { + + private val CACHE = new CarbonLRUCache( + CarbonCommonConstants.CARBON_PARTITION_MAX_DRIVER_LRU_CACHE_SIZE, + CarbonCommonConstants.CARBON_MAX_LRU_CACHE_SIZE_DEFAULT) + + val LOGGER: Logger = LogServiceFactory.getLogService(this.getClass.getName) + + def get(identifier: PartitionCacheKey): java.util.List[CatalogTablePartition] = { + LOGGER.info("Reading partition values from store") + // read the tableStatus file to get valid and invalid segments + val validInvalidSegments = new SegmentStatusManager(AbsoluteTableIdentifier.from( + identifier.tablePath, null, null, identifier.tableId)) + .getValidAndInvalidSegments + val cacheablePartitionSpecs = validInvalidSegments.getValidSegments.asScala.map { segment => + val segmentFileName = segment.getSegmentFileName + val segmentFilePath = FileFactory.getCarbonFile( + CarbonTablePath.getSegmentFilePath(identifier.tablePath, segmentFileName)) + // read the last modified time + val segmentFileModifiedTime = segmentFilePath.getLastModifiedTime + val existingCache = CACHE.get(identifier.tableId) + if (existingCache != null) { + val segmentCache = CACHE.get(identifier.tableId).asInstanceOf[CacheablePartitionSpec] + .partitionSpecs.get(segment.getSegmentNo) + segmentCache match { + case Some(c) => + // check if cache needs to be updated + if (segmentFileModifiedTime > c._2) { + (segment.getSegmentNo, (readPartition(identifier, + segmentFilePath.getAbsolutePath), segmentFileModifiedTime)) + } else { + (segment.getSegmentNo, c) + } + case None => + (segment.getSegmentNo, (readPartition(identifier, + segmentFilePath.getAbsolutePath), segmentFileModifiedTime)) + } + } else { + // read the partitions if not available in cache. + (segment.getSegmentNo, (readPartition(identifier, Review comment: @ajantha-bhat `sparkSession.sessionState.catalog.listPartitionsByFilter(identifier, partitionFilters)` is already being used for to filter the partitions based on the filter, but in concurrent queries as apark has to take a lock on metastore before accessing it, therefore the query performance was being degraded. And no user will have 200k segment, they need to anyways compact otherwise the query performance would be degraded ########## File path: integration/spark/src/main/scala/org/apache/spark/util/PartitionCacheManager.scala ########## @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +import java.net.URI +import java.util + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.log4j.Logger +import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTablePartition} + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.cache.{Cache, Cacheable, CarbonLRUCache} +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.index.Segment +import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, SegmentFileStore} +import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager} +import org.apache.carbondata.core.util.path.CarbonTablePath + +object PartitionCacheManager extends Cache[PartitionCacheKey, + java.util.List[CatalogTablePartition]] { + + private val CACHE = new CarbonLRUCache( + CarbonCommonConstants.CARBON_PARTITION_MAX_DRIVER_LRU_CACHE_SIZE, + CarbonCommonConstants.CARBON_MAX_LRU_CACHE_SIZE_DEFAULT) + + val LOGGER: Logger = LogServiceFactory.getLogService(this.getClass.getName) + + def get(identifier: PartitionCacheKey): java.util.List[CatalogTablePartition] = { + LOGGER.info("Reading partition values from store") + // read the tableStatus file to get valid and invalid segments + val validInvalidSegments = new SegmentStatusManager(AbsoluteTableIdentifier.from( + identifier.tablePath, null, null, identifier.tableId)) + .getValidAndInvalidSegments + val cacheablePartitionSpecs = validInvalidSegments.getValidSegments.asScala.map { segment => + val segmentFileName = segment.getSegmentFileName + val segmentFilePath = FileFactory.getCarbonFile( + CarbonTablePath.getSegmentFilePath(identifier.tablePath, segmentFileName)) + // read the last modified time + val segmentFileModifiedTime = segmentFilePath.getLastModifiedTime + val existingCache = CACHE.get(identifier.tableId) + if (existingCache != null) { + val segmentCache = CACHE.get(identifier.tableId).asInstanceOf[CacheablePartitionSpec] + .partitionSpecs.get(segment.getSegmentNo) + segmentCache match { + case Some(c) => + // check if cache needs to be updated + if (segmentFileModifiedTime > c._2) { + (segment.getSegmentNo, (readPartition(identifier, + segmentFilePath.getAbsolutePath), segmentFileModifiedTime)) + } else { + (segment.getSegmentNo, c) + } + case None => + (segment.getSegmentNo, (readPartition(identifier, + segmentFilePath.getAbsolutePath), segmentFileModifiedTime)) + } + } else { + // read the partitions if not available in cache. + (segment.getSegmentNo, (readPartition(identifier, Review comment: Yeah, because they are using presto, where compaction is not yet supported. If they are using hivemetastore surely they will face degrade in concurrent queries ########## File path: integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala ########## @@ -585,6 +587,30 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte } } + test("test partition caching") { + CarbonProperties.getInstance().addProperty("carbon.read.partition.hive.direct", "false") + sql("drop table if exists partition_cache") + sql("create table partition_cache(a string) partitioned by(b int) stored as carbondata") + sql("insert into partition_cache select 'k',1") + sql("select * from partition_cache where b = 1").collect() Review comment: done ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
