[
https://issues.apache.org/jira/browse/CARBONDATA-2?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15622411#comment-15622411
]
ASF GitHub Bot commented on CARBONDATA-2:
-----------------------------------------
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/incubator-carbondata/pull/263#discussion_r85757712
--- Diff:
integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
---
@@ -213,6 +224,64 @@ public static void executeGraph(CarbonLoadModel
loadModel, String storeLocation,
info, loadModel.getPartitionId(),
loadModel.getCarbonDataLoadSchema());
}
+ public static void executeNewDataLoad(CarbonLoadModel loadModel, String
storeLocation,
+ String hdfsStoreLocation, RecordReader<NullWritable,
StringArrayWritable>[] recordReaders)
+ throws Exception {
+ if (!new File(storeLocation).mkdirs()) {
+ LOGGER.error("Error while creating the temp store path: " +
storeLocation);
+ }
+ CarbonDataLoadConfiguration configuration = new
CarbonDataLoadConfiguration();
+ String databaseName = loadModel.getDatabaseName();
+ String tableName = loadModel.getTableName();
+ String tempLocationKey = databaseName +
CarbonCommonConstants.UNDERSCORE + tableName
+ + CarbonCommonConstants.UNDERSCORE + loadModel.getTaskNo();
+ CarbonProperties.getInstance().addProperty(tempLocationKey,
storeLocation);
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.STORE_LOCATION_HDFS,
hdfsStoreLocation);
+ // CarbonProperties.getInstance().addProperty("store_output_location",
outPutLoc);
+ CarbonProperties.getInstance().addProperty("send.signal.load",
"false");
+
+ CarbonTable carbonTable =
loadModel.getCarbonDataLoadSchema().getCarbonTable();
+ AbsoluteTableIdentifier identifier =
+ carbonTable.getAbsoluteTableIdentifier();
+ configuration.setTableIdentifier(identifier);
+ String csvHeader = loadModel.getCsvHeader();
+ if (csvHeader != null && !csvHeader.isEmpty()) {
+
configuration.setHeader(CarbonDataProcessorUtil.getColumnFields(csvHeader,
","));
+ } else {
+ CarbonFile csvFile =
+
CarbonDataProcessorUtil.getCsvFileToRead(loadModel.getFactFilesToProcess().get(0));
+ configuration
+ .setHeader(CarbonDataProcessorUtil.getFileHeader(csvFile,
loadModel.getCsvDelimiter()));
+ }
+
+ configuration.setPartitionId(loadModel.getPartitionId());
+ configuration.setSegmentId(loadModel.getSegmentId());
+ configuration.setTaskNo(loadModel.getTaskNo());
+
configuration.setDataLoadProperty(DataLoadProcessorConstants.COMPLEX_DELIMITERS,
+ new String[] { loadModel.getComplexDelimiterLevel1(),
+ loadModel.getComplexDelimiterLevel2() });
+ List<CarbonDimension> dimensions =
+
carbonTable.getDimensionByTableName(carbonTable.getFactTableName());
+ List<CarbonMeasure> measures =
+ carbonTable.getMeasureByTableName(carbonTable.getFactTableName());
+ DataField[] dataFields = new DataField[dimensions.size() +
measures.size()];
+
+ int i = 0;
+ for (CarbonColumn column : dimensions) {
+ dataFields[i++] = new DataField(column);
+ }
+ for (CarbonColumn column : measures) {
+ dataFields[i++] = new DataField(column);
+ }
+ Iterator[] iterators = new RecordReaderIterator[recordReaders.length];
+ configuration.setDataFields(dataFields);
+ for (int j = 0; j < recordReaders.length; j++) {
+ iterators[j] = new RecordReaderIterator(recordReaders[j]);
+ }
+ new DataLoadProcessExecutor().execute(configuration, iterators);
--- End diff --
should have a CarbonTableOutputFormat and use it here, right?
> Remove kettle for loading data
> ------------------------------
>
> Key: CARBONDATA-2
> URL: https://issues.apache.org/jira/browse/CARBONDATA-2
> Project: CarbonData
> Issue Type: Improvement
> Reporter: Liang Chen
> Priority: Critical
> Fix For: 0.3.0-incubating
>
> Attachments: CarbonDataLoadingdesign.pdf
>
>
> Remove kettle for loading data module
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)