This is an automated email from the ASF dual-hosted git repository.
ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new cc59e74 [CARBONDATA-3925] flink write carbon file to hdfs when file
size is less than 1M,can't write
cc59e74 is described below
commit cc59e74d9026048bd36335db30611f1bcd32fd9b
Author: xiaoyu <[email protected]>
AuthorDate: Sat Aug 15 15:46:23 2020 +0800
[CARBONDATA-3925] flink write carbon file to hdfs when file size is less
than 1M,can't write
Why is this PR needed?
write carbon file less than 1M , less than configured minimum value
(dfs.namenode.fs-limits.min-block-size) 1M ;can't write hdfs file
What changes were proposed in this PR?
the default block size is set 1024 byte and compare with file size;when
file size big than 1KB but less than 1M ,can not write file to hdfs ;so
set default block size to 2MB
Does this PR introduce any user interface change?
No
Is any new testcase added?
No
This closes #3892
---
.../main/java/org/apache/carbon/core/metadata/StageManager.java | 2 +-
.../flink/src/main/java/org/apache/carbon/flink/CarbonWriter.java | 8 +++++---
2 files changed, 6 insertions(+), 4 deletions(-)
diff --git
a/integration/flink/src/main/java/org/apache/carbon/core/metadata/StageManager.java
b/integration/flink/src/main/java/org/apache/carbon/core/metadata/StageManager.java
index 5abd0ed..114a5a6 100644
---
a/integration/flink/src/main/java/org/apache/carbon/core/metadata/StageManager.java
+++
b/integration/flink/src/main/java/org/apache/carbon/core/metadata/StageManager.java
@@ -81,7 +81,7 @@ public final class StageManager {
private static void writeSuccessFile(final String successFilePath) throws
IOException {
final DataOutputStream segmentStatusSuccessOutputStream =
FileFactory.getDataOutputStream(successFilePath,
- CarbonCommonConstants.BYTEBUFFER_SIZE, 1024);
+ CarbonCommonConstants.BYTEBUFFER_SIZE, 1024 * 1024 * 2);
try {
IOUtils.copyBytes(
new ByteArrayInputStream(new byte[0]),
diff --git
a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonWriter.java
b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonWriter.java
index 847b312..5034bc1 100644
--- a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonWriter.java
+++ b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonWriter.java
@@ -41,7 +41,7 @@ import org.apache.log4j.Logger;
public abstract class CarbonWriter extends ProxyFileWriter<Object[]> {
private static final Logger LOGGER =
- LogServiceFactory.getLogService(CarbonS3Writer.class.getName());
+ LogServiceFactory.getLogService(CarbonWriter.class.getName());
public CarbonWriter(final CarbonWriterFactory factory,
final String identifier, final CarbonTable table) {
@@ -87,7 +87,8 @@ public abstract class CarbonWriter extends
ProxyFileWriter<Object[]> {
"Upload file[" + file.getAbsolutePath() + "] to [" + remotePath
+ "] start.");
}
try {
-
CarbonUtil.copyCarbonDataFileToCarbonStorePath(file.getAbsolutePath(),
remotePath, 1024);
+
CarbonUtil.copyCarbonDataFileToCarbonStorePath(file.getAbsolutePath(),
remotePath,
+ 1024 * 1024 * 2);
} catch (CarbonDataWriterException exception) {
LOGGER.error(exception.getMessage(), exception);
throw exception;
@@ -131,7 +132,8 @@ public abstract class CarbonWriter extends
ProxyFileWriter<Object[]> {
LOGGER.debug("Upload file[" + file.getAbsolutePath() + "] to [" +
remotePath + "] start.");
}
try {
- CarbonUtil.copyCarbonDataFileToCarbonStorePath(file.getAbsolutePath(),
remotePath, 1024);
+ CarbonUtil.copyCarbonDataFileToCarbonStorePath(file.getAbsolutePath(),
remotePath,
+ 1024 * 1024 * 2);
} catch (CarbonDataWriterException exception) {
LOGGER.error(exception.getMessage(), exception);
throw exception;