kunal642 commented on code in PR #4261:
URL: https://github.com/apache/carbondata/pull/4261#discussion_r877785306


##########
integration/spark/src/main/scala/org/apache/carbondata/recovery/tablestatus/TableStatusRecovery.scala:
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.recovery.tablestatus
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.{CarbonEnv, SparkSession}
+import org.apache.spark.sql.util.SparkSQLUtil
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, 
CarbonFileFilter}
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.index.Segment
+import org.apache.carbondata.core.metadata.SegmentFileStore
+import org.apache.carbondata.core.mutate.SegmentUpdateDetails
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, 
SegmentStatus, SegmentStatusManager, SegmentUpdateStatusManager}
+import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.spark.util.CarbonScalaUtil
+
+object TableStatusRecovery {
+  def main(args: Array[String]): Unit = {
+    // check the argument contains database name and tablename to recover 
table status file
+    assert(args.length == 2)
+    createCarbonSession()
+    val sparkSession = SparkSQLUtil.getSparkSession
+    val tableName = args(1)
+    val databaseName = args(0)
+    // get carbon table to start table status recovery
+    val carbonTable = try {
+      CarbonEnv.getCarbonTable(Some(databaseName), tableName)(sparkSession)
+    } catch {
+      case ex: Exception =>
+        throw ex
+    }
+
+    /**
+     * 1. get the current table status version file name associated with 
carbon table
+     * 2. Check if the current table status version file exists
+     * 3. If does not exists, then read all the old table status version files 
and find the last
+     *    recent version file and get the load metadata details. For the lost 
load metadata,
+     *    read the segment files and table status update files to recover the 
lost
+     *    load metadata entry and add it to previous version load metadata 
details list.
+     * 4. Write the load metadata details list with version name as [Step:1]
+     * */
+    val tableStatusVersion = carbonTable.getTableStatusVersion
+    val tableStatusPath = 
CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath,
+      tableStatusVersion)
+    val tableStatusFile = FileFactory.getCarbonFile(
+      FileFactory.getUpdatedFilePath(tableStatusPath))
+    if (!tableStatusFile.exists()) {
+      // case where the current version table status file is lost, then get 
the previous table
+      // status version file and update it as the current table status version
+      val tableStatusFiles = 
CarbonScalaUtil.getTableStatusVersionFiles(carbonTable.getTablePath)
+      // read the segment files in the Metadata directory
+      val segmentFileDir = 
FileFactory.getCarbonFile(FileFactory.getUpdatedFilePath(
+        CarbonTablePath.getSegmentFilesLocation(carbonTable.getTablePath)))
+      val segmentFiles = segmentFileDir.listFiles()
+        .map(_.getName)
+        .filter(segmentFileName => 
segmentFileName.endsWith(CarbonTablePath.SEGMENT_EXT))

Review Comment:
   use collect instead of map + filter



##########
integration/spark/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala:
##########
@@ -206,6 +212,7 @@ object StreamSinkFactory {
       carbonLoadModel,
       hadoopConf)
     carbonLoadModel.setSegmentId(segmentId)
+    
carbonLoadModel.setLatestTableStatusVersion(System.currentTimeMillis().toString)

Review Comment:
   same as previous comment



##########
core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java:
##########
@@ -629,17 +630,19 @@ public static boolean updateTableStatusFile(CarbonTable 
carbonTable, String segm
    */
   public static boolean updateTableStatusFile(CarbonTable carbonTable, String 
segmentId,
       String segmentFile, String tableId, SegmentFileStore segmentFileStore,
-      SegmentStatus segmentStatus) throws IOException {
+      SegmentStatus segmentStatus, String tblStatusVersion) throws IOException 
{
     boolean status = false;
     String tablePath = carbonTable.getTablePath();
-    String tableStatusPath = CarbonTablePath.getTableStatusFilePath(tablePath);
+    String tableStatusPath =
+        CarbonTablePath.getTableStatusFilePath(tablePath, tblStatusVersion);
     if (!FileFactory.isFileExist(tableStatusPath)) {
       return status;
     }
     String metadataPath = CarbonTablePath.getMetadataPath(tablePath);
     AbsoluteTableIdentifier absoluteTableIdentifier =
         AbsoluteTableIdentifier.from(tablePath, null, null, tableId);
-    SegmentStatusManager segmentStatusManager = new 
SegmentStatusManager(absoluteTableIdentifier);
+    SegmentStatusManager segmentStatusManager =
+        new SegmentStatusManager(absoluteTableIdentifier, 
carbonTable.getTableStatusVersion());

Review Comment:
   what is the difference between `carbonTable.getTableStatusVersion()` and 
tblStatusVersion passed as argument to this method?



##########
integration/spark/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala:
##########
@@ -546,6 +546,7 @@ object DataLoadProcessBuilderOnSpark {
     if (globalSortPartitions != null) {
       loadModel.setGlobalSortPartitions(globalSortPartitions)
     }
+    loadModel.setLatestTableStatusVersion(System.currentTimeMillis().toString)

Review Comment:
   this will fail incase of concurrent load as the time can be same. better to 
get time inside lock just before writing



##########
core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java:
##########
@@ -364,6 +380,9 @@ public static LoadMetadataDetails[] 
readTableStatusFile(String tableStatusPath)
         if (retry == 0) {
           // we have retried several times, throw this exception to make the 
execution failed
           LOG.error("Failed to read table status file:" + tableStatusPath);
+          if (ex.getMessage().contains("Table Status Version file")) {

Review Comment:
   why is this check required?



##########
integration/spark/src/main/scala/org/apache/carbondata/recovery/tablestatus/TableStatusRecovery.scala:
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.recovery.tablestatus
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.{CarbonEnv, SparkSession}
+import org.apache.spark.sql.util.SparkSQLUtil
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, 
CarbonFileFilter}
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.index.Segment
+import org.apache.carbondata.core.metadata.SegmentFileStore
+import org.apache.carbondata.core.mutate.SegmentUpdateDetails
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, 
SegmentStatus, SegmentStatusManager, SegmentUpdateStatusManager}
+import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.spark.util.CarbonScalaUtil
+
+object TableStatusRecovery {
+  def main(args: Array[String]): Unit = {
+    // check the argument contains database name and tablename to recover 
table status file
+    assert(args.length == 2)
+    createCarbonSession()
+    val sparkSession = SparkSQLUtil.getSparkSession
+    val tableName = args(1)
+    val databaseName = args(0)
+    // get carbon table to start table status recovery
+    val carbonTable = try {
+      CarbonEnv.getCarbonTable(Some(databaseName), tableName)(sparkSession)
+    } catch {
+      case ex: Exception =>
+        throw ex
+    }
+
+    /**
+     * 1. get the current table status version file name associated with 
carbon table
+     * 2. Check if the current table status version file exists
+     * 3. If does not exists, then read all the old table status version files 
and find the last
+     *    recent version file and get the load metadata details. For the lost 
load metadata,
+     *    read the segment files and table status update files to recover the 
lost
+     *    load metadata entry and add it to previous version load metadata 
details list.
+     * 4. Write the load metadata details list with version name as [Step:1]
+     * */
+    val tableStatusVersion = carbonTable.getTableStatusVersion
+    val tableStatusPath = 
CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath,
+      tableStatusVersion)
+    val tableStatusFile = FileFactory.getCarbonFile(
+      FileFactory.getUpdatedFilePath(tableStatusPath))
+    if (!tableStatusFile.exists()) {
+      // case where the current version table status file is lost, then get 
the previous table
+      // status version file and update it as the current table status version
+      val tableStatusFiles = 
CarbonScalaUtil.getTableStatusVersionFiles(carbonTable.getTablePath)
+      // read the segment files in the Metadata directory
+      val segmentFileDir = 
FileFactory.getCarbonFile(FileFactory.getUpdatedFilePath(
+        CarbonTablePath.getSegmentFilesLocation(carbonTable.getTablePath)))
+      val segmentFiles = segmentFileDir.listFiles()
+        .map(_.getName)
+        .filter(segmentFileName => 
segmentFileName.endsWith(CarbonTablePath.SEGMENT_EXT))
+        .toList
+      if (tableStatusFiles.isEmpty) {
+        if (segmentFiles.isEmpty) {
+          // no metadata found to recover table status file
+          throw new Exception(

Review Comment:
   throw different exceptions to know whether tablestatus or segments files is 
empty



##########
integration/spark/src/main/scala/org/apache/carbondata/recovery/tablestatus/TableStatusRecovery.scala:
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.recovery.tablestatus
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.{CarbonEnv, SparkSession}
+import org.apache.spark.sql.util.SparkSQLUtil
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, 
CarbonFileFilter}
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.index.Segment
+import org.apache.carbondata.core.metadata.SegmentFileStore
+import org.apache.carbondata.core.mutate.SegmentUpdateDetails
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, 
SegmentStatus, SegmentStatusManager, SegmentUpdateStatusManager}
+import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.spark.util.CarbonScalaUtil
+
+object TableStatusRecovery {
+  def main(args: Array[String]): Unit = {
+    // check the argument contains database name and tablename to recover 
table status file
+    assert(args.length == 2)
+    createCarbonSession()
+    val sparkSession = SparkSQLUtil.getSparkSession
+    val tableName = args(1)
+    val databaseName = args(0)
+    // get carbon table to start table status recovery
+    val carbonTable = try {
+      CarbonEnv.getCarbonTable(Some(databaseName), tableName)(sparkSession)
+    } catch {
+      case ex: Exception =>
+        throw ex
+    }
+
+    /**
+     * 1. get the current table status version file name associated with 
carbon table
+     * 2. Check if the current table status version file exists
+     * 3. If does not exists, then read all the old table status version files 
and find the last
+     *    recent version file and get the load metadata details. For the lost 
load metadata,
+     *    read the segment files and table status update files to recover the 
lost
+     *    load metadata entry and add it to previous version load metadata 
details list.
+     * 4. Write the load metadata details list with version name as [Step:1]
+     * */
+    val tableStatusVersion = carbonTable.getTableStatusVersion
+    val tableStatusPath = 
CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath,
+      tableStatusVersion)
+    val tableStatusFile = FileFactory.getCarbonFile(
+      FileFactory.getUpdatedFilePath(tableStatusPath))
+    if (!tableStatusFile.exists()) {
+      // case where the current version table status file is lost, then get 
the previous table
+      // status version file and update it as the current table status version
+      val tableStatusFiles = 
CarbonScalaUtil.getTableStatusVersionFiles(carbonTable.getTablePath)
+      // read the segment files in the Metadata directory
+      val segmentFileDir = 
FileFactory.getCarbonFile(FileFactory.getUpdatedFilePath(
+        CarbonTablePath.getSegmentFilesLocation(carbonTable.getTablePath)))
+      val segmentFiles = segmentFileDir.listFiles()
+        .map(_.getName)
+        .filter(segmentFileName => 
segmentFileName.endsWith(CarbonTablePath.SEGMENT_EXT))
+        .toList
+      if (tableStatusFiles.isEmpty) {
+        if (segmentFiles.isEmpty) {
+          // no metadata found to recover table status file
+          throw new Exception(
+            "Table Status Version File/ Segment Files does not exists to 
recover load metadata")
+        }
+      }
+      // prepare segment to latest timestamp version map. This is required, in 
case of drop
+      // partition, where there can be multiple segment files for same segment 
Id
+      val segToTimeStampMap = new util.HashMap[String, String]()
+      segmentFiles.foreach { segmentFile =>
+        val segmentToTimestamp = 
segmentFile.trim.split(CarbonCommonConstants.UNDERSCORE).toList
+        if (!segToTimeStampMap.containsKey(segmentToTimestamp.head)) {
+          segToTimeStampMap.put(segmentToTimestamp.head, 
segmentToTimestamp.last)
+        } else {
+          val timeStamp = segToTimeStampMap.get(segmentToTimestamp.head)
+          if (timeStamp <= segmentToTimestamp.last) {
+            segToTimeStampMap.put(segmentToTimestamp.head, 
segmentToTimestamp.last)
+          }
+        }
+      }
+      // iterate the available table status version files and find the most 
recent table status
+      // version file
+      val latestTableStatusVersionStr = 
CarbonScalaUtil.getLatestTblStatusVersionBasedOnTimestamp(
+        tableStatusFiles)
+
+      // read the load metadata details with the identified table status 
version file
+      var loadMetaDetails = 
SegmentStatusManager.readLoadMetadata(CarbonTablePath.getMetadataPath(
+        carbonTable.getTablePath), latestTableStatusVersionStr).toList
+
+      var updateMetaDetails: Array[SegmentUpdateDetails] = Array.empty
+
+      val tableUpdateStatusFiles = 
FileFactory.getCarbonFile(CarbonTablePath.getMetadataPath(
+        carbonTable.getTablePath)).listFiles(new CarbonFileFilter {
+        override def accept(file: CarbonFile): Boolean = {
+          
file.getName.startsWith(CarbonCommonConstants.TABLEUPDATESTATUS_FILENAME)
+        }
+      })
+
+      // if table has table update status files, iterate and identify the 
latest table status
+      // update file
+      if (tableUpdateStatusFiles.nonEmpty) {
+        var latestTableUpdateStatusVersion = 0L
+        tableUpdateStatusFiles.foreach { tableStatusFile =>
+          val updateVersionTimeStamp = tableStatusFile.getName
+            
.substring(tableStatusFile.getName.indexOf(CarbonCommonConstants.HYPHEN) + 1,
+              tableStatusFile.getName.length).toLong
+          if (latestTableUpdateStatusVersion <= updateVersionTimeStamp) {
+            latestTableUpdateStatusVersion = updateVersionTimeStamp
+          }
+        }
+        updateMetaDetails = SegmentUpdateStatusManager.readLoadMetadata(
+          CarbonCommonConstants.TABLEUPDATESTATUS_FILENAME + 
CarbonCommonConstants.HYPHEN +
+          latestTableUpdateStatusVersion.toString, carbonTable.getTablePath)
+      }
+
+      // check which segment is missing from lost table status version
+      val missedLoadMetaDetails: util.List[LoadMetadataDetails] =
+        new util.ArrayList[LoadMetadataDetails]()
+      segToTimeStampMap.asScala.foreach { segmentFileEntry =>
+        val segmentFileName = segmentFileEntry._1 + 
CarbonCommonConstants.UNDERSCORE +
+                              segmentFileEntry._2
+        val segmentId = segmentFileEntry._1
+        val segmentUpdateDetail = updateMetaDetails
+          .filter(_.getSegmentName.equalsIgnoreCase(segmentId))
+        // check if the segment Id from segment file entry exists in load 
metadata details list.
+        // If does not exist, or if the segment file mapped to the load 
metadata entry and the
+        // latest segment file timestamp is not same, then prepare new load 
metadata.
+        if ((!loadMetaDetails.exists(_.getLoadName.equalsIgnoreCase(segmentId))
+             || 
!loadMetaDetails.filter(_.getLoadName.equalsIgnoreCase(segmentId))
+          .head.getSegmentFile.equalsIgnoreCase(segmentFileName)) &&
+            !segmentId.contains(CarbonCommonConstants.POINT)) {
+          val segFilePath = CarbonTablePath.getSegmentFilePath(
+            carbonTable.getTablePath, segmentFileName)
+          // read segment file and prepare load metadata
+          val segmentFile = SegmentFileStore.readSegmentFile(segFilePath)
+          val loadMetadataDetail = new LoadMetadataDetails()
+          val segmentInfo = segmentFile.getLocationMap.asScala.head._2
+          if (!segmentUpdateDetail.isEmpty) {
+            
loadMetadataDetail.setSegmentStatus(segmentUpdateDetail.head.getSegmentStatus)
+            
loadMetadataDetail.setModificationOrDeletionTimestamp(segmentUpdateDetail.head
+              .getDeleteDeltaStartTimeAsLong)
+          } else {
+            
loadMetadataDetail.setSegmentStatus(getSegmentStatus(segmentInfo.getStatus))
+          }
+          loadMetadataDetail.setLoadName(segmentId)
+          loadMetadataDetail.setSegmentFile(segmentFileName)
+          val dataIndexSize = CarbonUtil.getDataSizeAndIndexSize(carbonTable
+            .getTablePath, new Segment(segmentId, segmentFileName))
+          loadMetadataDetail.setDataSize(dataIndexSize
+            .get(CarbonCommonConstants.CARBON_TOTAL_DATA_SIZE).toString)
+          loadMetadataDetail.setIndexSize(dataIndexSize
+            .get(CarbonCommonConstants.CARBON_TOTAL_INDEX_SIZE).toString)
+          loadMetadataDetail.setLoadEndTime(FileFactory
+            .getCarbonFile(segFilePath)
+            .getLastModifiedTime)
+          missedLoadMetaDetails.add(loadMetadataDetail)
+          if 
(loadMetaDetails.exists(_.getLoadName.equalsIgnoreCase(segmentId))) {
+            loadMetaDetails = loadMetaDetails.filterNot(_.getLoadName
+              .equalsIgnoreCase(segmentId))
+          }
+        } else if (!segmentUpdateDetail.isEmpty) {
+          // in case of Update/delete, update the already existing load 
metadata entry with the
+          // latest segment update detail
+          val loadMetadataDetail = loadMetaDetails
+            .filter(_.getLoadName.equalsIgnoreCase(segmentId))
+            .head

Review Comment:
   use find() instead of filter + head



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to