This is an automated email from the ASF dual-hosted git repository.

ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new cc59e74  [CARBONDATA-3925] flink write carbon file to hdfs when file 
size is less than 1M,can't write
cc59e74 is described below

commit cc59e74d9026048bd36335db30611f1bcd32fd9b
Author: xiaoyu <[email protected]>
AuthorDate: Sat Aug 15 15:46:23 2020 +0800

    [CARBONDATA-3925] flink write carbon file to hdfs when file size is less 
than 1M,can't write
    
    Why is this PR needed?
    write carbon file less than 1M , less than configured minimum value 
(dfs.namenode.fs-limits.min-block-size) 1M ;can't write hdfs file
    
    What changes were proposed in this PR?
    the default block size is set 1024 byte and compare with file size;when
    file size big than 1KB but less than 1M ,can not write file to hdfs ;so
    set default block size to 2MB
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    No
    
    This closes #3892
---
 .../main/java/org/apache/carbon/core/metadata/StageManager.java   | 2 +-
 .../flink/src/main/java/org/apache/carbon/flink/CarbonWriter.java | 8 +++++---
 2 files changed, 6 insertions(+), 4 deletions(-)

diff --git 
a/integration/flink/src/main/java/org/apache/carbon/core/metadata/StageManager.java
 
b/integration/flink/src/main/java/org/apache/carbon/core/metadata/StageManager.java
index 5abd0ed..114a5a6 100644
--- 
a/integration/flink/src/main/java/org/apache/carbon/core/metadata/StageManager.java
+++ 
b/integration/flink/src/main/java/org/apache/carbon/core/metadata/StageManager.java
@@ -81,7 +81,7 @@ public final class StageManager {
   private static void writeSuccessFile(final String successFilePath) throws 
IOException {
     final DataOutputStream segmentStatusSuccessOutputStream =
         FileFactory.getDataOutputStream(successFilePath,
-            CarbonCommonConstants.BYTEBUFFER_SIZE, 1024);
+            CarbonCommonConstants.BYTEBUFFER_SIZE, 1024 * 1024 * 2);
     try {
       IOUtils.copyBytes(
           new ByteArrayInputStream(new byte[0]),
diff --git 
a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonWriter.java 
b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonWriter.java
index 847b312..5034bc1 100644
--- a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonWriter.java
+++ b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonWriter.java
@@ -41,7 +41,7 @@ import org.apache.log4j.Logger;
 public abstract class CarbonWriter extends ProxyFileWriter<Object[]> {
 
   private static final Logger LOGGER =
-      LogServiceFactory.getLogService(CarbonS3Writer.class.getName());
+      LogServiceFactory.getLogService(CarbonWriter.class.getName());
 
   public CarbonWriter(final CarbonWriterFactory factory,
       final String identifier, final CarbonTable table) {
@@ -87,7 +87,8 @@ public abstract class CarbonWriter extends 
ProxyFileWriter<Object[]> {
               "Upload file[" + file.getAbsolutePath() + "] to [" + remotePath 
+ "] start.");
         }
         try {
-          
CarbonUtil.copyCarbonDataFileToCarbonStorePath(file.getAbsolutePath(), 
remotePath, 1024);
+          
CarbonUtil.copyCarbonDataFileToCarbonStorePath(file.getAbsolutePath(), 
remotePath,
+                  1024 * 1024 * 2);
         } catch (CarbonDataWriterException exception) {
           LOGGER.error(exception.getMessage(), exception);
           throw exception;
@@ -131,7 +132,8 @@ public abstract class CarbonWriter extends 
ProxyFileWriter<Object[]> {
         LOGGER.debug("Upload file[" + file.getAbsolutePath() + "] to [" + 
remotePath + "] start.");
       }
       try {
-        CarbonUtil.copyCarbonDataFileToCarbonStorePath(file.getAbsolutePath(), 
remotePath, 1024);
+        CarbonUtil.copyCarbonDataFileToCarbonStorePath(file.getAbsolutePath(), 
remotePath,
+                  1024 * 1024 * 2);
       } catch (CarbonDataWriterException exception) {
         LOGGER.error(exception.getMessage(), exception);
         throw exception;

Reply via email to