Repository: carbondata
Updated Branches:
  refs/heads/master 6297ea0b4 -> 8fe165668


[CARBONDATA-2465] Improve the carbondata file reliability in data load when 
direct hdfs write is enabled

Problem: At present if we enable direct write on HDFS, data is written with 
replication of 1 which can cause data loss.
Solution: Write with cluster replication. With this change No need to invoke
CompleteHdfsBackendThread/completeRemainingHdfsReplicas for direct hdfs write 
case.

This closes #2235


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/8fe16566
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/8fe16566
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/8fe16566

Branch: refs/heads/master
Commit: 8fe165668e2662455991f9de6af817ccc99b81ee
Parents: 6297ea0
Author: KanakaKumar <kanaka.av...@huawei.com>
Authored: Thu Apr 26 23:39:29 2018 +0530
Committer: kunal642 <kunalkapoor...@gmail.com>
Committed: Thu May 17 19:42:59 2018 +0530

----------------------------------------------------------------------
 .../apache/carbondata/core/util/CarbonUtil.java | 27 -----------
 .../store/writer/AbstractFactDataWriter.java    | 47 ++++++--------------
 2 files changed, 14 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/8fe16566/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java 
b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index ac0a800..9dc4aa2 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -2817,33 +2817,6 @@ public final class CarbonUtil {
   }
 
   /**
-   * This method will complete the remaining hdfs replications
-   *
-   * @param fileName hdfs file name
-   * @param fileType filetype
-   * @throws CarbonDataWriterException if error occurs
-   */
-  public static void completeRemainingHdfsReplicas(String fileName, 
FileFactory.FileType fileType)
-    throws CarbonDataWriterException {
-    try {
-      long startTime = System.currentTimeMillis();
-      short replication = FileFactory.getDefaultReplication(fileName, 
fileType);
-      if (1 == replication) {
-        return;
-      }
-      boolean replicateFlag = FileFactory.setReplication(fileName, fileType, 
replication);
-      if (!replicateFlag) {
-        LOGGER.error("Failed to set replication for " + fileName + " with 
factor " + replication);
-      }
-      LOGGER.info(
-          "Total copy time (ms) to copy file " + fileName + " is " + 
(System.currentTimeMillis()
-              - startTime));
-    } catch (IOException e) {
-      throw new CarbonDataWriterException("Problem while completing remaining 
HDFS backups", e);
-    }
-  }
-
-  /**
    * This method will read the local carbon data file and write to carbon data 
file in HDFS
    *
    * @param carbonStoreFilePath

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8fe16566/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
 
b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
index 6e557cd..8115f97 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
@@ -176,6 +176,7 @@ public abstract class AbstractFactDataWriter implements 
CarbonFactDataWriter {
         CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_HDFS,
         
CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_HDFS_DEFAULT);
     this.enableDirectlyWriteData2Hdfs = 
"TRUE".equalsIgnoreCase(directlyWriteData2Hdfs);
+
     if (enableDirectlyWriteData2Hdfs) {
       LOGGER.info("Carbondata will directly write fact data to HDFS.");
     } else {
@@ -274,22 +275,13 @@ public abstract class AbstractFactDataWriter implements 
CarbonFactDataWriter {
   protected void commitCurrentFile(boolean copyInCurrentThread) {
     notifyDataMapBlockEnd();
     CarbonUtil.closeStreams(this.fileOutputStream, this.fileChannel);
-    if (enableDirectlyWriteData2Hdfs) {
-      if (copyInCurrentThread) {
-        CarbonUtil.completeRemainingHdfsReplicas(carbonDataFileHdfsPath,
-            FileFactory.FileType.HDFS);
-      } else {
-        executorServiceSubmitList.add(executorService.submit(
-            new CompleteHdfsBackendThread(carbonDataFileHdfsPath, 
FileFactory.FileType.HDFS)));
-      }
-    } else {
+    if (!enableDirectlyWriteData2Hdfs) {
       if (copyInCurrentThread) {
         CarbonUtil.copyCarbonDataFileToCarbonStorePath(carbonDataFileTempPath,
-            model.getCarbonDataDirectoryPath(),
-            fileSizeInBytes);
+            model.getCarbonDataDirectoryPath(), fileSizeInBytes);
       } else {
         executorServiceSubmitList.add(executorService.submit(
-            new CompleteHdfsBackendThread(carbonDataFileTempPath, 
FileFactory.FileType.LOCAL)));
+            new CompleteHdfsBackendThread(carbonDataFileTempPath)));
       }
     }
   }
@@ -310,10 +302,9 @@ public abstract class AbstractFactDataWriter implements 
CarbonFactDataWriter {
       if (enableDirectlyWriteData2Hdfs) {
         // the block size will be twice the block_size specified by user to 
make sure that
         // one carbondata file only consists exactly one HDFS block.
-        // Here we write the first replication and will complete the remaining 
later.
-        fileOutputStream = 
FileFactory.getDataOutputStream(carbonDataFileHdfsPath,
-            FileFactory.FileType.HDFS, CarbonCommonConstants.BYTEBUFFER_SIZE, 
fileSizeInBytes * 2,
-            (short) 1);
+        fileOutputStream = FileFactory
+            .getDataOutputStream(carbonDataFileHdfsPath, 
FileFactory.FileType.HDFS,
+                CarbonCommonConstants.BYTEBUFFER_SIZE, fileSizeInBytes * 2);
       } else {
         //each time we initialize writer, we choose a local temp location 
randomly
         String[] tempFileLocations = model.getStoreLocation();
@@ -416,13 +407,10 @@ public abstract class AbstractFactDataWriter implements 
CarbonFactDataWriter {
       writer.writeThrift(blockIndex);
     }
     writer.close();
-    if (enableDirectlyWriteData2Hdfs) {
-      executorServiceSubmitList.add(executorService.submit(
-          new CompleteHdfsBackendThread(indexFileName, 
FileFactory.FileType.HDFS)));
-    } else {
-      CarbonUtil.copyCarbonDataFileToCarbonStorePath(indexFileName,
-          model.getCarbonDataDirectoryPath(),
-          fileSizeInBytes);
+    if (!enableDirectlyWriteData2Hdfs) {
+      CarbonUtil
+          .copyCarbonDataFileToCarbonStorePath(indexFileName, 
model.getCarbonDataDirectoryPath(),
+              fileSizeInBytes);
     }
   }
 
@@ -459,11 +447,9 @@ public abstract class AbstractFactDataWriter implements 
CarbonFactDataWriter {
      * carbon store path
      */
     private String fileName;
-    private FileFactory.FileType fileType;
 
-    private CompleteHdfsBackendThread(String fileName, FileFactory.FileType 
fileType) {
+    private CompleteHdfsBackendThread(String fileName) {
       this.fileName = fileName;
-      this.fileType = fileType;
     }
 
     /**
@@ -474,13 +460,8 @@ public abstract class AbstractFactDataWriter implements 
CarbonFactDataWriter {
      */
     @Override
     public Void call() throws Exception {
-      if (FileFactory.FileType.HDFS == fileType) {
-        CarbonUtil.completeRemainingHdfsReplicas(fileName, fileType);
-      } else {
-        CarbonUtil.copyCarbonDataFileToCarbonStorePath(fileName,
-            model.getCarbonDataDirectoryPath(),
-            fileSizeInBytes);
-      }
+      CarbonUtil.copyCarbonDataFileToCarbonStorePath(fileName, 
model.getCarbonDataDirectoryPath(),
+          fileSizeInBytes);
       return null;
     }
   }

Reply via email to