Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1984#discussion_r169635879
--- Diff:
core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java ---
@@ -0,0 +1,690 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.metadata;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.Serializable;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
+import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
+import org.apache.carbondata.core.fileoperations.FileWriteOperation;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
+import
org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
+import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataFileFooterConverter;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import com.google.gson.Gson;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Provide read and write support for segment file associated with each
segment
+ */
+public class SegmentFileStore {
+
+ private SegmentFile segmentFile;
+
+ private Map<String, List<String>> indexFilesMap;
+
+ private String tablePath;
+
+ /**
+ * Write segment information to the segment folder with indexfilename and
+ * corresponding partitions.
+ */
+ public void writeSegmentFile(String tablePath, final String taskNo,
String location,
+ String timeStamp, List<String> partionNames) throws IOException {
+ String tempFolderLoc = timeStamp + ".tmp";
+ String writePath = CarbonTablePath.getSegmentFilesLocation(tablePath)
+ "/" + tempFolderLoc;
+ CarbonFile carbonFile = FileFactory.getCarbonFile(writePath);
+ if (!carbonFile.exists()) {
+ carbonFile.mkdirs(writePath, FileFactory.getFileType(writePath));
+ }
+ CarbonFile tempFolder =
+ FileFactory.getCarbonFile(location +
CarbonCommonConstants.FILE_SEPARATOR + tempFolderLoc);
+ boolean isRelative = false;
+ if (location.startsWith(tablePath)) {
+ location = location.substring(tablePath.length(), location.length());
+ isRelative = true;
+ }
+ if (tempFolder.exists() && partionNames.size() > 0) {
+ CarbonFile[] carbonFiles = tempFolder.listFiles(new
CarbonFileFilter() {
+ @Override public boolean accept(CarbonFile file) {
+ return file.getName().startsWith(taskNo) && file.getName()
+ .endsWith(CarbonTablePath.INDEX_FILE_EXT);
+ }
+ });
+ if (carbonFiles != null && carbonFiles.length > 0) {
+ SegmentFile segmentFile = new SegmentFile();
+ Map<String, FolderDetails> locationMap = new HashMap<>();
+ FolderDetails folderDetails = new FolderDetails();
+ folderDetails.setRelative(isRelative);
+ folderDetails.setPartitions(partionNames);
+ folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage());
+ for (CarbonFile file : carbonFiles) {
+ folderDetails.getFiles().add(file.getName());
+ }
+ locationMap.put(location, folderDetails);
+ segmentFile.setLocationMap(locationMap);
+ String path = writePath + "/" + taskNo +
CarbonTablePath.SEGMENT_EXT;
+ // write segment info to new file.
+ writeSegmentFile(segmentFile, path);
+ }
+ }
+ }
+
+ /**
+ * Writes the segment file in json format
+ * @param segmentFile
+ * @param path
+ * @throws IOException
+ */
+ public void writeSegmentFile(SegmentFile segmentFile, String path)
throws IOException {
+ AtomicFileOperations fileWrite =
+ new AtomicFileOperationsImpl(path, FileFactory.getFileType(path));
+ BufferedWriter brWriter = null;
+ DataOutputStream dataOutputStream = null;
+ Gson gsonObjectToWrite = new Gson();
+ try {
+ dataOutputStream =
fileWrite.openForWrite(FileWriteOperation.OVERWRITE);
+ brWriter = new BufferedWriter(new
OutputStreamWriter(dataOutputStream,
+ Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
+
+ String metadataInstance = gsonObjectToWrite.toJson(segmentFile);
+ brWriter.write(metadataInstance);
+ } finally {
+ if (null != brWriter) {
+ brWriter.flush();
+ }
+ CarbonUtil.closeStreams(brWriter);
+ fileWrite.close();
+ }
+ }
+
+ /**
+ * Merge all segment files in a segment to single file.
+ *
+ * @param writePath
+ * @throws IOException
+ */
+ public SegmentFile mergeSegmentFiles(String readPath, String
mergeFileName, String writePath)
+ throws IOException {
+ CarbonFile[] segmentFiles = getSegmentFiles(readPath);
+ if (segmentFiles != null && segmentFiles.length > 0) {
+ SegmentFile segmentFile = null;
+ for (CarbonFile file : segmentFiles) {
+ SegmentFile localSegmentFile =
readSegmentFile(file.getAbsolutePath());
+ if (segmentFile == null && localSegmentFile != null) {
+ segmentFile = localSegmentFile;
+ }
+ if (localSegmentFile != null) {
+ segmentFile = segmentFile.merge(localSegmentFile);
+ }
+ }
+ if (segmentFile != null) {
+ String path = writePath + "/" + mergeFileName +
CarbonTablePath.SEGMENT_EXT;
+ writeSegmentFile(segmentFile, path);
+
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(readPath));
+ }
+ return segmentFile;
+ }
+ return null;
+ }
+
+ private CarbonFile[] getSegmentFiles(String segmentPath) {
+ CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath);
+ if (carbonFile.exists()) {
+ return carbonFile.listFiles(new CarbonFileFilter() {
+ @Override public boolean accept(CarbonFile file) {
+ return file.getName().endsWith(CarbonTablePath.SEGMENT_EXT);
+ }
+ });
+ }
+ return null;
+ }
+
+ /**
+ * It provides segment file only for the partitions which has physical
index files.
+ *
+ * @param partitionSpecs
+ */
+ public static SegmentFile getSegmentFileForPhysicalDataPartitions(String
tablePath,
+ List<PartitionSpec> partitionSpecs) {
+ SegmentFile segmentFile = null;
+ for (PartitionSpec spec : partitionSpecs) {
+ String location = spec.getLocation().toString();
+ CarbonFile carbonFile = FileFactory.getCarbonFile(location);
+ boolean isRelative = false;
+ if (location.startsWith(tablePath)) {
+ location = location.substring(tablePath.length(),
location.length());
+ isRelative = true;
+ }
+ CarbonFile[] listFiles = carbonFile.listFiles(new CarbonFileFilter()
{
+ @Override public boolean accept(CarbonFile file) {
+ return CarbonTablePath.isCarbonIndexFile(file.getAbsolutePath());
+ }
+ });
+ if (listFiles != null && listFiles.length > 0) {
+ SegmentFile localSegmentFile = new SegmentFile();
+ Map<String, FolderDetails> locationMap = new HashMap<>();
+ FolderDetails folderDetails = new FolderDetails();
+ folderDetails.setRelative(isRelative);
+ folderDetails.setPartitions(spec.getPartitions());
+ folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage());
+ for (CarbonFile file : listFiles) {
+ if
(file.getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
+ folderDetails.setMergeFileName(file.getName());
+ } else {
+ folderDetails.getFiles().add(file.getName());
+ }
+ }
+ locationMap.put(location, folderDetails);
+ localSegmentFile.setLocationMap(locationMap);
+ if (segmentFile == null) {
+ segmentFile = localSegmentFile;
+ } else {
+ segmentFile = segmentFile.merge(localSegmentFile);
+ }
+ }
+ }
+ return segmentFile;
+ }
+
+ /**
+ * This method reads the segment file which is written in json format
+ *
+ * @param segmentFilePath
+ * @return
+ */
+ private SegmentFile readSegmentFile(String segmentFilePath) throws
IOException {
+ Gson gsonObjectToRead = new Gson();
+ DataInputStream dataInputStream = null;
+ BufferedReader buffReader = null;
+ InputStreamReader inStream = null;
+ SegmentFile segmentFile;
+ AtomicFileOperations fileOperation =
+ new AtomicFileOperationsImpl(segmentFilePath,
FileFactory.getFileType(segmentFilePath));
+
+ try {
+ if (!FileFactory.isFileExist(segmentFilePath,
FileFactory.getFileType(segmentFilePath))) {
+ return null;
+ }
+ dataInputStream = fileOperation.openForRead();
+ inStream = new InputStreamReader(dataInputStream,
+ Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET));
+ buffReader = new BufferedReader(inStream);
+ segmentFile = gsonObjectToRead.fromJson(buffReader,
SegmentFile.class);
+ } finally {
+ if (inStream != null) {
+ CarbonUtil.closeStreams(buffReader, inStream, dataInputStream);
+ }
+ }
+
+ return segmentFile;
+ }
+
+ /**
+ * Reads segment file.
+ */
+ public void readSegment(String tablePath, String segmentFileName) throws
IOException {
+ String segmentFilePath =
+ CarbonTablePath.getSegmentFilesLocation(tablePath) +
CarbonCommonConstants.FILE_SEPARATOR
+ + segmentFileName;
+ SegmentFile segmentFile = readSegmentFile(segmentFilePath);
+ this.tablePath = tablePath;
+ this.segmentFile = segmentFile;
+ }
+
+ public String getTablePath() {
+ return tablePath;
+ }
+
+ /**
+ * Gets all the index files and related carbondata files from this
segment. First user needs to
+ * call @readIndexFiles method before calling it.
+ * @return
+ */
+ public Map<String, List<String>> getIndexFilesMap() {
+ return indexFilesMap;
+ }
+
+ /**
+ * Reads all index files which are located in this segment. First user
needs to call
+ * @readSegment method before calling it.
+ * @throws IOException
+ */
+ public void readIndexFiles() throws IOException {
+ readIndexFiles(SegmentStatus.SUCCESS, false);
+ }
+
+ /**
+ * Reads all index files as per the status of the file. In case of
@ignoreStatus is true it just
+ * reads all index files
+ * @param status
+ * @param ignoreStatus
+ * @throws IOException
+ */
+ private void readIndexFiles(SegmentStatus status, boolean ignoreStatus)
throws IOException {
+ if (indexFilesMap != null) {
+ return;
+ }
+ SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore();
+ indexFilesMap = new HashMap<>();
+ indexFileStore.readAllIIndexOfSegment(this, status, ignoreStatus);
+ Map<String, byte[]> carbonIndexMap =
indexFileStore.getCarbonIndexMapWithFullPath();
+ DataFileFooterConverter fileFooterConverter = new
DataFileFooterConverter();
+ for (Map.Entry<String, byte[]> entry : carbonIndexMap.entrySet()) {
+ List<DataFileFooter> indexInfo =
+ fileFooterConverter.getIndexInfo(entry.getKey(),
entry.getValue());
+ List<String> blocks = new ArrayList<>();
+ for (DataFileFooter footer : indexInfo) {
+
blocks.add(footer.getBlockInfo().getTableBlockInfo().getFilePath());
+ }
+ indexFilesMap.put(entry.getKey(), blocks);
+ }
+ }
+
+ /**
+ * Gets all index files from this segment
+ * @return
+ */
+ public Map<String, String> getIndexFiles() {
+ Map<String, String> indexFiles = new HashMap<>();
+ if (segmentFile != null) {
+ for (Map.Entry<String, FolderDetails> entry :
getLocationMap().entrySet()) {
+ String location = entry.getKey();
+ if (entry.getValue().isRelative) {
+ location = tablePath + CarbonCommonConstants.FILE_SEPARATOR +
location;
+ }
+ if
(entry.getValue().status.equals(SegmentStatus.SUCCESS.getMessage())) {
+ for (String indexFile : entry.getValue().getFiles()) {
+ indexFiles.put(location + CarbonCommonConstants.FILE_SEPARATOR
+ indexFile,
+ entry.getValue().mergeFileName);
+ }
+ }
+ }
+ }
+ return indexFiles;
+ }
+
+ /**
+ * Drops the partition related files from the segment file of the
segment and writes
+ * to a new file. First iterator over segment file and check the path it
needs to be dropped.
+ * And update the status with delete if it found.
+ *
+ * @param uniqueId
+ * @throws IOException
+ */
+ public void dropPartitions(String tablePath, Segment segment,
List<PartitionSpec> partitionSpecs,
+ String uniqueId, List<String> toBeDeletedSegments, List<String>
toBeUpdatedSegments)
+ throws IOException {
+ readSegment(tablePath, segment.getSegmentFileName());
+ boolean updateSegment = false;
+ for (Map.Entry<String, FolderDetails> entry :
segmentFile.getLocationMap().entrySet()) {
+ String location = entry.getKey();
+ if (entry.getValue().isRelative) {
+ location = tablePath + CarbonCommonConstants.FILE_SEPARATOR +
location;
+ }
+ Path path = new Path(location);
+ // Update the status to delete if path equals
+ for (PartitionSpec spec : partitionSpecs) {
+ if (path.equals(spec.getLocation())) {
+
entry.getValue().setStatus(SegmentStatus.MARKED_FOR_DELETE.getMessage());
+ updateSegment = true;
+ break;
+ }
+ }
+ }
+ String writePath = CarbonTablePath.getSegmentFilesLocation(tablePath);
+ writePath =
+ writePath + CarbonCommonConstants.FILE_SEPARATOR +
segment.getSegmentId() + "_" + uniqueId
+ + CarbonTablePath.SEGMENT_EXT;
+ writeSegmentFile(segmentFile, writePath);
+ // Check whether we can completly remove the segment.
+ boolean deleteSegment = true;
+ for (Map.Entry<String, FolderDetails> entry :
segmentFile.getLocationMap().entrySet()) {
+ if
(entry.getValue().getStatus().equals(SegmentStatus.SUCCESS.getMessage())) {
+ deleteSegment = false;
+ }
+ }
+ if (deleteSegment) {
+ toBeDeletedSegments.add(segment.getSegmentId());
+ }
+ if (updateSegment) {
+ toBeUpdatedSegments.add(segment.getSegmentId());
+ }
+ }
+
+ /**
+ * Update the table status file with the dropped partitions information
+ *
+ * @param carbonTable
+ * @param uniqueId
+ * @param toBeUpdatedSegments
+ * @param toBeDeleteSegments
+ * @throws IOException
+ */
+ public static void commitDropPartitions(CarbonTable carbonTable, String
uniqueId,
+ List<String> toBeUpdatedSegments, List<String> toBeDeleteSegments)
throws IOException {
+ Set<Segment> segmentSet = new HashSet<>(
--- End diff --
ok
---