ravipesala commented on a change in pull request #3179: [CARBONDATA-3338] 
Support Incremental DataLoad for MV Datamap[with single parent table]
URL: https://github.com/apache/carbondata/pull/3179#discussion_r278069323
 
 

 ##########
 File path: 
core/src/main/java/org/apache/carbondata/core/datamap/status/DiskBasedDataMapStatusProvider.java
 ##########
 @@ -196,4 +207,278 @@ private static ICarbonLock getDataMapStatusLock() {
         
.getSystemLevelCarbonLockObj(CarbonProperties.getInstance().getSystemFolderLocation(),
             LockUsage.DATAMAP_STATUS_LOCK);
   }
+
+  /**
+   * Reads and returns dataMapSegmentStatusDetail
+   *
+   * @param dataMapSchema
+   * @throws IOException
+   */
+  public DataMapSegmentStatusDetail getDataMapSegmentStatus(DataMapSchema 
dataMapSchema)
+      throws IOException {
+    String statusPath = 
getDatamapSegmentStatusFile(dataMapSchema.getDataMapName());
+    Gson gsonObjectToRead = new Gson();
+    DataInputStream dataInputStream = null;
+    BufferedReader buffReader = null;
+    InputStreamReader inStream = null;
+    DataMapSegmentStatusDetail dataMapSegmentStatusDetail;
+    try {
+      if (!FileFactory.isFileExist(statusPath)) {
+        return new DataMapSegmentStatusDetail();
+      }
+      dataInputStream =
+          FileFactory.getDataInputStream(statusPath, 
FileFactory.getFileType(statusPath));
+      inStream = new InputStreamReader(dataInputStream,
+          Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET));
+      buffReader = new BufferedReader(inStream);
+      dataMapSegmentStatusDetail =
+          gsonObjectToRead.fromJson(buffReader, 
DataMapSegmentStatusDetail.class);
+    } catch (IOException e) {
+      LOG.error("Failed to read datamap segment status", e);
+      throw e;
+    } finally {
+      CarbonUtil.closeStreams(buffReader, inStream, dataInputStream);
+    }
+
+    if (null == dataMapSegmentStatusDetail) {
+      return new DataMapSegmentStatusDetail();
+    }
+    return dataMapSegmentStatusDetail;
+  }
+
+  /**
+   * After each data load to mv datamap, update the segment status mapping. 
Get the new load name
+   * from datamap table loadMetaDetails and map newly loaded main table 
segments against the datamap
+   * table new load entry
+   *
+   * @param dataMapSchema
+   * @throws IOException
+   */
+  public void updateSegmentMapping(DataMapSchema dataMapSchema) throws 
IOException {
+    DataMapSegmentStatusDetail dataMapSegmentStatus = 
getDataMapSegmentStatus(dataMapSchema);
+    List<RelationIdentifier> relationIdentifiers = 
dataMapSchema.getParentTables();
+    CarbonTable dataMapTable = CarbonTable
+        
.buildFromTablePath(dataMapSchema.getRelationIdentifier().getTableName(),
+            dataMapSchema.getRelationIdentifier().getDatabaseName(),
+            dataMapSchema.getRelationIdentifier().getTablePath(),
+            dataMapSchema.getRelationIdentifier().getTableId());
+    LoadMetadataDetails[] loadMetadataDetails =
+        SegmentStatusManager.readLoadMetadata(dataMapTable.getMetadataPath());
+    if (loadMetadataDetails.length != 0) {
+      String newLoadKey;
+      if (!dataMapSegmentStatus.getSegmentMapping().isEmpty()) {
+        for (LoadMetadataDetails entry : loadMetadataDetails) {
+          if (entry.getSegmentStatus() == SegmentStatus.MARKED_FOR_DELETE
+              || entry.getSegmentStatus() == SegmentStatus.COMPACTED) {
+            //In case of main table or datamap compaction, remove invalid 
entries
+            
dataMapSegmentStatus.getSegmentMapping().remove(entry.getLoadName());
+          }
+        }
+      } else {
+        dataMapSegmentStatus.setDataMapName(dataMapSchema.getDataMapName());
+      }
+      newLoadKey = loadMetadataDetails[loadMetadataDetails.length - 
1].getLoadName();
+      Map<String, List<String>> mainTableSegmentMap = new HashMap<>();
+      for (RelationIdentifier relationIdentifier : relationIdentifiers) {
+        List<String> validMainTableSegmentList =
+            DataMapStatusManager.getSegmentList(relationIdentifier);
+        List<String> datamapTableSegmentList =
+            getDataMapSegmentsFromMapping(dataMapSegmentStatus, 
relationIdentifier);
+        // Compare main table and datamap table valid segment list and collect 
newly loaded segments
+        // from main table to datamap table
+        validMainTableSegmentList.removeAll(datamapTableSegmentList);
+        mainTableSegmentMap.put(relationIdentifier.getTableName(), 
validMainTableSegmentList);
+      }
+      dataMapSegmentStatus.getSegmentMapping().put(newLoadKey, 
mainTableSegmentMap);
+      
dataMapSegmentStatus.setSegmentMapping(dataMapSegmentStatus.getSegmentMapping());
+      writeToSegmentStatusFile(dataMapSegmentStatus, 
dataMapSchema.getDataMapName());
+    }
+  }
+
+  /**
+   * write datamap to mainTbale segment mapping details
+   *
+   * @param dataMapSegmentStatus
+   * @param dataMapName
+   * @throws IOException
+   */
+  private void writeToSegmentStatusFile(DataMapSegmentStatusDetail 
dataMapSegmentStatus,
+      String dataMapName) throws IOException {
+    ICarbonLock carbonTableStatusLock = getDataMapStatusLock();
+    try {
+      if (carbonTableStatusLock.lockWithRetries()) {
+        writeSegmentDetailsIntoFile(getDatamapSegmentStatusFile(dataMapName), 
dataMapSegmentStatus);
+      } else {
+        String errorMsg = "Not able to acquire the lock for DataMap Segment 
status updation";
+        LOG.error(errorMsg);
+        throw new IOException(errorMsg);
+      }
+    } finally {
+      if (carbonTableStatusLock.unlock()) {
+        CarbonLockUtil.fileUnlock(carbonTableStatusLock, 
LockUsage.DATAMAP_STATUS_LOCK);
+      }
+    }
+  }
+
+  /**
+   * Returns list of segments of mainTable which are already loaded to MV 
dataMap table
+   *
+   * @param dataMapSegmentStatus
+   * @param relationIdentifier
+   */
+  public List<String> getDataMapSegmentsFromMapping(DataMapSegmentStatusDetail 
dataMapSegmentStatus,
 
 Review comment:
   This method also should not be part of it

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to