Github user gvramana commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1984#discussion_r169641109
--- Diff:
core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java ---
@@ -0,0 +1,690 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.metadata;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.Serializable;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
+import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
+import org.apache.carbondata.core.fileoperations.FileWriteOperation;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
+import
org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
+import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataFileFooterConverter;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import com.google.gson.Gson;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Provide read and write support for segment file associated with each
segment
+ */
+public class SegmentFileStore {
+
+ private SegmentFile segmentFile;
+
+ private Map<String, List<String>> indexFilesMap;
+
+ private String tablePath;
+
+ /**
+ * Write segment information to the segment folder with indexfilename and
+ * corresponding partitions.
+ */
+ public void writeSegmentFile(String tablePath, final String taskNo,
String location,
+ String timeStamp, List<String> partionNames) throws IOException {
+ String tempFolderLoc = timeStamp + ".tmp";
+ String writePath = CarbonTablePath.getSegmentFilesLocation(tablePath)
+ "/" + tempFolderLoc;
+ CarbonFile carbonFile = FileFactory.getCarbonFile(writePath);
+ if (!carbonFile.exists()) {
+ carbonFile.mkdirs(writePath, FileFactory.getFileType(writePath));
+ }
+ CarbonFile tempFolder =
+ FileFactory.getCarbonFile(location +
CarbonCommonConstants.FILE_SEPARATOR + tempFolderLoc);
+ boolean isRelative = false;
+ if (location.startsWith(tablePath)) {
+ location = location.substring(tablePath.length(), location.length());
+ isRelative = true;
+ }
+ if (tempFolder.exists() && partionNames.size() > 0) {
+ CarbonFile[] carbonFiles = tempFolder.listFiles(new
CarbonFileFilter() {
+ @Override public boolean accept(CarbonFile file) {
+ return file.getName().startsWith(taskNo) && file.getName()
+ .endsWith(CarbonTablePath.INDEX_FILE_EXT);
+ }
+ });
+ if (carbonFiles != null && carbonFiles.length > 0) {
+ SegmentFile segmentFile = new SegmentFile();
+ Map<String, FolderDetails> locationMap = new HashMap<>();
+ FolderDetails folderDetails = new FolderDetails();
+ folderDetails.setRelative(isRelative);
+ folderDetails.setPartitions(partionNames);
+ folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage());
+ for (CarbonFile file : carbonFiles) {
+ folderDetails.getFiles().add(file.getName());
+ }
+ locationMap.put(location, folderDetails);
+ segmentFile.setLocationMap(locationMap);
+ String path = writePath + "/" + taskNo +
CarbonTablePath.SEGMENT_EXT;
+ // write segment info to new file.
+ writeSegmentFile(segmentFile, path);
+ }
+ }
+ }
+
+ /**
+ * Writes the segment file in json format
+ * @param segmentFile
+ * @param path
+ * @throws IOException
+ */
+ public void writeSegmentFile(SegmentFile segmentFile, String path)
throws IOException {
+ AtomicFileOperations fileWrite =
+ new AtomicFileOperationsImpl(path, FileFactory.getFileType(path));
+ BufferedWriter brWriter = null;
+ DataOutputStream dataOutputStream = null;
+ Gson gsonObjectToWrite = new Gson();
+ try {
+ dataOutputStream =
fileWrite.openForWrite(FileWriteOperation.OVERWRITE);
+ brWriter = new BufferedWriter(new
OutputStreamWriter(dataOutputStream,
+ Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
+
+ String metadataInstance = gsonObjectToWrite.toJson(segmentFile);
+ brWriter.write(metadataInstance);
+ } finally {
+ if (null != brWriter) {
+ brWriter.flush();
+ }
+ CarbonUtil.closeStreams(brWriter);
+ fileWrite.close();
+ }
+ }
+
+ /**
+ * Merge all segment files in a segment to single file.
+ *
+ * @param writePath
+ * @throws IOException
+ */
+ public SegmentFile mergeSegmentFiles(String readPath, String
mergeFileName, String writePath)
+ throws IOException {
+ CarbonFile[] segmentFiles = getSegmentFiles(readPath);
+ if (segmentFiles != null && segmentFiles.length > 0) {
+ SegmentFile segmentFile = null;
+ for (CarbonFile file : segmentFiles) {
+ SegmentFile localSegmentFile =
readSegmentFile(file.getAbsolutePath());
+ if (segmentFile == null && localSegmentFile != null) {
+ segmentFile = localSegmentFile;
+ }
+ if (localSegmentFile != null) {
+ segmentFile = segmentFile.merge(localSegmentFile);
+ }
+ }
+ if (segmentFile != null) {
+ String path = writePath + "/" + mergeFileName +
CarbonTablePath.SEGMENT_EXT;
+ writeSegmentFile(segmentFile, path);
+
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(readPath));
--- End diff --
cleanup required for temporary folders, if abortJob is not called when
process is killed.
---