This is an automated email from the ASF dual-hosted git repository. ravipesala pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push: new e2ddc41 [CARBONDATA-3661] Fix target file size check fail when upload local file to carbon store e2ddc41 is described below commit e2ddc415e6530d5dae85ecea43e7bb96504df36b Author: liuzhi <371684...@qq.com> AuthorDate: Fri Jan 10 12:54:24 2020 +0800 [CARBONDATA-3661] Fix target file size check fail when upload local file to carbon store Why is this PR needed? Multi flink tasks write carbon data may use the same carbon data file name, it will cause target file size check fail when upload local file to carbon store. What changes were proposed in this PR? Make different flink task use different carbon data file name. use UUID as write task ID. Does this PR introduce any user interface change? No Is any new testcase added? No This closes #3573 --- .../java/org/apache/carbon/flink/CarbonLocalWriter.java | 1 + .../main/java/org/apache/carbon/flink/CarbonS3Writer.java | 1 + .../org/apache/carbondata/sdk/file/CarbonWriterBuilder.java | 13 +++++++++++++ 3 files changed, 15 insertions(+) diff --git a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonLocalWriter.java b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonLocalWriter.java index db88cd4..a8068a3 100644 --- a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonLocalWriter.java +++ b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonLocalWriter.java @@ -62,6 +62,7 @@ final class CarbonLocalWriter extends CarbonWriter { try { final CarbonWriterBuilder writerBuilder = org.apache.carbondata.sdk.file.CarbonWriter.builder() + .taskNo(UUID.randomUUID().toString().replace("-", "")) .outputPath(super.getWritePath(row)) .writtenBy("flink") .withSchemaFile(CarbonTablePath.getSchemaFilePath(table.getTablePath())) diff --git a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonS3Writer.java b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonS3Writer.java index ecae32a..d23c668 100644 --- a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonS3Writer.java +++ b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonS3Writer.java @@ -65,6 +65,7 @@ final class CarbonS3Writer extends CarbonWriter { try { final CarbonWriterBuilder writerBuilder = org.apache.carbondata.sdk.file.CarbonWriter.builder() + .taskNo(UUID.randomUUID().toString().replace("-", "")) .outputPath(super.getWritePath(row)) .writtenBy("flink") .withSchemaFile(CarbonTablePath.getSchemaFilePath(table.getTablePath())) diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java index eb47a8d..cbf899f 100644 --- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java @@ -152,6 +152,19 @@ public class CarbonWriterBuilder { } /** + * sets the taskNo for the writer. SDKs concurrently running + * will set taskNo in order to avoid conflicts in file's name during write. + * + * @param taskNo is the TaskNo user wants to specify. + * by default it is system time in nano seconds. + * @return updated CarbonWriterBuilder + */ + public CarbonWriterBuilder taskNo(String taskNo) { + this.taskNo = taskNo; + return this; + } + + /** * to set the timestamp in the carbondata and carbonindex index files * * @param timestamp is a timestamp to be used in the carbondata and carbonindex index files.