[HOTFIX] support "carbon.load.directWriteHdfs.enabled" for S3

problem : Currently for s3, when the above carbon property is set. index file 
will not be written in the s3 store path due to bug in folder path.

Solution: file separator used is wrong. Need to fix it.
Also rename a carbon peroperty
"carbon.load.directWriteHdfs.enabled" to
"carbon.load.directWriteToStorePath.enabled"

This closes #2697


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/5d17ff40
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/5d17ff40
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/5d17ff40

Branch: refs/heads/branch-1.5
Commit: 5d17ff40bdeeba64a8885fa2df427fbdec6a38ea
Parents: 2a4f530
Author: ajantha-bhat <[email protected]>
Authored: Thu Sep 6 16:47:22 2018 +0530
Committer: kunal642 <[email protected]>
Committed: Thu Sep 27 14:03:37 2018 +0530

----------------------------------------------------------------------
 .../constants/CarbonLoadOptionConstants.java    |  6 ++--
 docs/configuration-parameters.md                |  2 +-
 .../carbondata/examples/sdk/SDKS3Example.java   | 12 +++++++
 .../dataload/TestLoadDataGeneral.scala          |  8 ++---
 .../store/writer/AbstractFactDataWriter.java    | 38 ++++++++++----------
 5 files changed, 39 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/5d17ff40/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
index 3eab69d..82485ca 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
@@ -136,9 +136,9 @@ public final class CarbonLoadOptionConstants {
   public static final String SORT_COLUMN_BOUNDS_ROW_DELIMITER = ";";
 
   @CarbonProperty
-  public static final String ENABLE_CARBON_LOAD_DIRECT_WRITE_HDFS
-      = "carbon.load.directWriteHdfs.enabled";
-  public static final String ENABLE_CARBON_LOAD_DIRECT_WRITE_HDFS_DEFAULT = 
"false";
+  public static final String ENABLE_CARBON_LOAD_DIRECT_WRITE_TO_STORE_PATH
+      = "carbon.load.directWriteToStorePath.enabled";
+  public static final String 
ENABLE_CARBON_LOAD_DIRECT_WRITE_TO_STORE_PATH_DEFAULT = "false";
 
   /**
    * If the sort memory is insufficient, spill inmemory pages to disk.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5d17ff40/docs/configuration-parameters.md
----------------------------------------------------------------------
diff --git a/docs/configuration-parameters.md b/docs/configuration-parameters.md
index 662525b..9dd8164 100644
--- a/docs/configuration-parameters.md
+++ b/docs/configuration-parameters.md
@@ -90,7 +90,7 @@ This section provides the details of all the configurations 
required for the Car
 | carbon.prefetch.buffersize | 1000 | When the configuration 
***carbon.merge.sort.prefetch*** is configured to true, we need to set the 
number of records that can be prefetched.This configuration is used specify the 
number of records to be prefetched.**NOTE: **Configuring more number of records 
to be prefetched increases memory footprint as more records will have to be 
kept in memory. |
 | load_min_size_inmb | 256 | This configuration is used along with 
***carbon.load.min.size.enabled***.This determines the minimum size of input 
files to be considered for distribution among executors while data 
loading.**NOTE:** Refer to ***carbon.load.min.size.enabled*** for understanding 
when this configuration needs to be used and its advantages and disadvantages. |
 | carbon.load.sortmemory.spill.percentage | 0 | During data loading, some data 
pages are kept in memory upto memory configured in 
***carbon.sort.storage.inmemory.size.inmb*** beyond which they are spilled to 
disk as intermediate temporary sort files.This configuration determines after 
what percentage data needs to be spilled to disk.**NOTE:** Without this 
configuration, when the data pages occupy upto configured memory, new data 
pages would be dumped to disk and old pages are still maintained in disk. |
-| carbon.load.directWriteHdfs.enabled | false | During data load all the 
carbondata files are written to local disk and finally copied to the target 
location in HDFS.Enabling this parameter will make carrbondata files to be 
written directly onto target HDFS location bypassing the local disk.**NOTE:** 
Writing directly to HDFS saves local disk IO(once for writing the files and 
again for copying to HDFS) there by improving the performance.But the drawback 
is when data loading fails or the application crashes, unwanted carbondata 
files will remain in the target HDFS location until it is cleared during next 
data load or by running *CLEAN FILES* DDL command |
+| carbon.load.directWriteToStorePath.enabled | false | During data load, all 
the carbondata files are written to local disk and finally copied to the target 
store location in HDFS/S3.Enabling this parameter will make carbondata files to 
be written directly onto target HDFS/S3 location bypassing the local 
disk.**NOTE:** Writing directly to HDFS/S3 saves local disk IO(once for writing 
the files and again for copying to HDFS/S3) there by improving the 
performance.But the drawback is when data loading fails or the application 
crashes, unwanted carbondata files will remain in the target HDFS/S3 location 
until it is cleared during next data load or by running *CLEAN FILES* DDL 
command |
 | carbon.options.serialization.null.format | \N | Based on the business 
scenarios, some columns might need to be loaded with null values.As null value 
cannot be written in csv files, some special characters might be adopted to 
specify null values.This configuration can be used to specify the null values 
format in the data being loaded. |
 | carbon.sort.storage.inmemory.size.inmb | 512 | CarbonData writes every 
***carbon.sort.size*** number of records to intermediate temp files during data 
loading to ensure memory footprint is within limits.When 
***enable.unsafe.sort*** configuration is enabled, instead of using 
***carbon.sort.size*** which is based on rows count, size occupied in memory is 
used to determine when to flush data pages to intermediate temp files.This 
configuration determines the memory to be used for storing data pages in 
memory.**NOTE:** Configuring a higher values ensures more data is maintained in 
memory and hence increases data loading performance due to reduced or no 
IO.Based on the memory availability in the nodes of the cluster, configure the 
values accordingly. |
 | carbon.column.compressor | snappy | CarbonData will compress the column 
values using the compressor specified by this configuration. Currently 
CarbonData supports 'snappy' and 'zstd' compressors. |

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5d17ff40/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java
----------------------------------------------------------------------
diff --git 
a/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java
 
b/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java
index d4f49f5..bc0e280 100644
--- 
a/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java
+++ 
b/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java
@@ -19,10 +19,12 @@ package org.apache.carbondata.examples.sdk;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.scan.expression.ColumnExpression;
 import org.apache.carbondata.core.scan.expression.LiteralExpression;
 import 
org.apache.carbondata.core.scan.expression.conditional.EqualToExpression;
+import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.sdk.file.*;
 
 import org.apache.hadoop.conf.Configuration;
@@ -40,6 +42,12 @@ public class SDKS3Example {
             System.exit(0);
         }
 
+        String backupProperty = CarbonProperties.getInstance()
+            
.getProperty(CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_TO_STORE_PATH,
+                
CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_TO_STORE_PATH_DEFAULT);
+        CarbonProperties.getInstance()
+            
.addProperty(CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_TO_STORE_PATH,
 "true");
+
         String path = "s3a://sdk/WriterOutput";
         if (args.length > 3) {
             path=args[3];
@@ -87,5 +95,9 @@ public class SDKS3Example {
         }
         System.out.println("\nFinished");
         reader.close();
+
+        CarbonProperties.getInstance()
+            
.addProperty(CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_TO_STORE_PATH,
+                backupProperty);
     }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5d17ff40/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
index 8b51090..02abb8d 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
@@ -243,10 +243,10 @@ class TestLoadDataGeneral extends QueryTest with 
BeforeAndAfterEach {
 
   test("test data loading with directly writing fact data to hdfs") {
     val originStatus = CarbonProperties.getInstance().getProperty(
-      CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_HDFS,
-      CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_HDFS_DEFAULT)
+      CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_TO_STORE_PATH,
+      
CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_TO_STORE_PATH_DEFAULT)
     CarbonProperties.getInstance().addProperty(
-      CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_HDFS, "true")
+      CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_TO_STORE_PATH, 
"true")
 
     val testData = s"$resourcesPath/sample.csv"
     sql(s"LOAD DATA LOCAL INPATH '$testData' into table loadtest")
@@ -256,7 +256,7 @@ class TestLoadDataGeneral extends QueryTest with 
BeforeAndAfterEach {
     )
 
     CarbonProperties.getInstance().addProperty(
-      CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_HDFS,
+      CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_TO_STORE_PATH,
       originStatus)
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5d17ff40/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
 
b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
index 4afb3ef..37d33c2 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
@@ -66,9 +66,9 @@ public abstract class AbstractFactDataWriter implements 
CarbonFactDataWriter {
   protected WritableByteChannel fileChannel;
   protected long currentOffsetInFile;
   /**
-   * The path of CarbonData file to write in hdfs
+   * The path of CarbonData file to write in hdfs/s3
    */
-  private String carbonDataFileHdfsPath;
+  private String carbonDataFileStorePath;
   /**
    * The temp path of carbonData file used on executor
    */
@@ -145,9 +145,9 @@ public abstract class AbstractFactDataWriter implements 
CarbonFactDataWriter {
    */
   protected DataMapWriterListener listener;
   /**
-   * Whether directly write fact data to hdfs
+   * Whether directly write fact data to store path
    */
-  private boolean enableDirectlyWriteData2Hdfs = false;
+  private boolean enableDirectlyWriteDataToStorePath = false;
 
   protected ExecutorService fallbackExecutorService;
 
@@ -172,11 +172,11 @@ public abstract class AbstractFactDataWriter implements 
CarbonFactDataWriter {
 
     // whether to directly write fact data to HDFS
     String directlyWriteData2Hdfs = propInstance
-        
.getProperty(CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_HDFS,
-            
CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_HDFS_DEFAULT);
-    this.enableDirectlyWriteData2Hdfs = 
"TRUE".equalsIgnoreCase(directlyWriteData2Hdfs);
+        
.getProperty(CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_TO_STORE_PATH,
+            
CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_TO_STORE_PATH_DEFAULT);
+    this.enableDirectlyWriteDataToStorePath = 
"TRUE".equalsIgnoreCase(directlyWriteData2Hdfs);
 
-    if (enableDirectlyWriteData2Hdfs) {
+    if (enableDirectlyWriteDataToStorePath) {
       LOGGER.info("Carbondata will directly write fact data to HDFS.");
     } else {
       LOGGER.info("Carbondata will write temporary fact data to local disk.");
@@ -225,7 +225,7 @@ public abstract class AbstractFactDataWriter implements 
CarbonFactDataWriter {
     if ((currentFileSize + blockletSizeToBeAdded) >= blockSizeThreshold && 
currentFileSize != 0) {
       // set the current file size to zero
       String activeFile =
-          enableDirectlyWriteData2Hdfs ? carbonDataFileHdfsPath : 
carbonDataFileTempPath;
+          enableDirectlyWriteDataToStorePath ? carbonDataFileStorePath : 
carbonDataFileTempPath;
       LOGGER.info("Writing data to file as max file size reached for file: "
           + activeFile + ". Data block size: " + currentFileSize);
       // write meta data to end of the existing file
@@ -269,7 +269,7 @@ public abstract class AbstractFactDataWriter implements 
CarbonFactDataWriter {
   protected void commitCurrentFile(boolean copyInCurrentThread) {
     notifyDataMapBlockEnd();
     CarbonUtil.closeStreams(this.fileOutputStream, this.fileChannel);
-    if (!enableDirectlyWriteData2Hdfs) {
+    if (!enableDirectlyWriteDataToStorePath) {
       try {
         if (copyInCurrentThread) {
           
CarbonUtil.copyCarbonDataFileToCarbonStorePath(carbonDataFileTempPath,
@@ -296,14 +296,14 @@ public abstract class AbstractFactDataWriter implements 
CarbonFactDataWriter {
         .getCarbonDataFileName(fileCount, 
model.getCarbonDataFileAttributes().getTaskId(),
             model.getBucketId(), model.getTaskExtension(),
             "" + model.getCarbonDataFileAttributes().getFactTimeStamp(), 
model.getSegmentId());
-    this.carbonDataFileHdfsPath = model.getCarbonDataDirectoryPath() + 
File.separator
+    this.carbonDataFileStorePath = model.getCarbonDataDirectoryPath() + 
File.separator
         + carbonDataFileName;
     try {
-      if (enableDirectlyWriteData2Hdfs) {
+      if (enableDirectlyWriteDataToStorePath) {
         // the block size will be twice the block_size specified by user to 
make sure that
         // one carbondata file only consists exactly one HDFS block.
         fileOutputStream = FileFactory
-            .getDataOutputStream(carbonDataFileHdfsPath, 
FileFactory.FileType.HDFS,
+            .getDataOutputStream(carbonDataFileStorePath, 
FileFactory.FileType.HDFS,
                 CarbonCommonConstants.BYTEBUFFER_SIZE, fileSizeInBytes * 2);
       } else {
         //each time we initialize writer, we choose a local temp location 
randomly
@@ -380,11 +380,11 @@ public abstract class AbstractFactDataWriter implements 
CarbonFactDataWriter {
     // get the block index info thrift
     List<BlockIndex> blockIndexThrift = 
CarbonMetadataUtil.getBlockIndexInfo(blockIndexInfoList);
     String indexFileName;
-    if (enableDirectlyWriteData2Hdfs) {
-      String rawFileName = model.getCarbonDataDirectoryPath() + File.separator 
+ CarbonTablePath
-          
.getCarbonIndexFileName(model.getCarbonDataFileAttributes().getTaskId(),
-              model.getBucketId(), model.getTaskExtension(),
-              "" + model.getCarbonDataFileAttributes().getFactTimeStamp(), 
model.getSegmentId());
+    if (enableDirectlyWriteDataToStorePath) {
+      String rawFileName = model.getCarbonDataDirectoryPath() + 
CarbonCommonConstants.FILE_SEPARATOR
+          + 
CarbonTablePath.getCarbonIndexFileName(model.getCarbonDataFileAttributes().getTaskId(),
+          model.getBucketId(), model.getTaskExtension(),
+          "" + model.getCarbonDataFileAttributes().getFactTimeStamp(), 
model.getSegmentId());
       indexFileName = FileFactory.getUpdatedFilePath(rawFileName, 
FileFactory.FileType.HDFS);
     } else {
       // randomly choose a temp location for index file
@@ -407,7 +407,7 @@ public abstract class AbstractFactDataWriter implements 
CarbonFactDataWriter {
       writer.writeThrift(blockIndex);
     }
     writer.close();
-    if (!enableDirectlyWriteData2Hdfs) {
+    if (!enableDirectlyWriteDataToStorePath) {
       CarbonUtil
           .copyCarbonDataFileToCarbonStorePath(indexFileName, 
model.getCarbonDataDirectoryPath(),
               fileSizeInBytes);

Reply via email to