Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1642#discussion_r157333237 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java --- @@ -18,22 +18,311 @@ package org.apache.carbondata.hadoop.api; import java.io.IOException; +import java.util.List; +import java.util.Random; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.mapred.FileOutputFormat; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.RecordWriter; -import org.apache.hadoop.util.Progressable; +import org.apache.carbondata.common.CarbonIterator; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.constants.CarbonLoadOptionConstants; +import org.apache.carbondata.core.metadata.datatype.StructField; +import org.apache.carbondata.core.metadata.datatype.StructType; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.TableInfo; +import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.carbondata.hadoop.util.ObjectSerializationUtil; +import org.apache.carbondata.processing.loading.DataLoadExecutor; +import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable; +import org.apache.carbondata.processing.loading.iterator.CarbonOutputIteratorWrapper; +import org.apache.carbondata.processing.loading.model.CarbonDataLoadSchema; +import org.apache.carbondata.processing.loading.model.CarbonLoadModel; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * Base class for all output format for CarbonData file. - * @param <T> */ -public abstract class CarbonTableOutputFormat<T> extends FileOutputFormat<Void, T> { +public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, StringArrayWritable> { + + private static final String LOAD_MODEL = "mapreduce.carbonoutputformat.load.mmodel"; + private static final String DATABASE_NAME = "mapreduce.carbonoutputformat.databaseName"; + private static final String TABLE_NAME = "mapreduce.carbonoutputformat.tableName"; + private static final String TABLE = "mapreduce.carbonoutputformat.table"; + private static final String TABLE_PATH = "mapreduce.carbonoutputformat.tablepath"; + private static final String INPUT_SCHEMA = "mapreduce.carbonoutputformat.inputschema"; + private static final String TEMP_STORE_LOCATIONS = "carbon.load.tempstore.locations"; + private static final String OVERWRITE_SET = "carbon.load.set.overwrite"; + public static final String COMPLEX_DELIMITERS = "mapreduce.carbonoutputformat.complex_delimiters"; + public static final String SERIALIZATION_NULL_FORMAT = + "mapreduce.carbonoutputformat.serialization.null.format"; + public static final String BAD_RECORDS_LOGGER_ENABLE = + "mapreduce.carbonoutputformat.bad.records.logger.enable"; + public static final String BAD_RECORDS_LOGGER_ACTION = + "mapreduce.carbonoutputformat.bad.records.logger.action"; + public static final String IS_EMPTY_DATA_BAD_RECORD = + "mapreduce.carbonoutputformat.empty.data.bad.record"; + public static final String SKIP_EMPTY_LINE = "mapreduce.carbonoutputformat.skip.empty.line"; + public static final String SORT_SCOPE = "mapreduce.carbonoutputformat.load.sort.scope"; + public static final String BATCH_SORT_SIZE_INMB = + "mapreduce.carbonoutputformat.batch.sort.size.inmb"; + public static final String GLOBAL_SORT_PARTITIONS = + "mapreduce.carbonoutputformat.global.sort.partitions"; + public static final String BAD_RECORD_PATH = "mapreduce.carbonoutputformat.bad.record.path"; + public static final String DATE_FORMAT = "mapreduce.carbonoutputformat.date.format"; + public static final String TIMESTAMP_FORMAT = "mapreduce.carbonoutputformat.timestamp.format"; + public static final String IS_ONE_PASS_LOAD = "mapreduce.carbonoutputformat.one.pass.load"; + public static final String DICTIONARY_SERVER_HOST = + "mapreduce.carbonoutputformat.dict.server.host"; + public static final String DICTIONARY_SERVER_PORT = + "mapreduce.carbonoutputformat.dict.server.port"; + + private CarbonOutputCommitter committer; + + public static void setDatabaseName(Configuration configuration, String databaseName) { + if (null != databaseName) { + configuration.set(DATABASE_NAME, databaseName); + } + } + + public static String getDatabaseName(Configuration configuration) { + return configuration.get(DATABASE_NAME); + } + + public static void setTableName(Configuration configuration, String tableName) { + if (null != tableName) { + configuration.set(TABLE_NAME, tableName); + } + } + + public static String getTableName(Configuration configuration) { + return configuration.get(TABLE_NAME); + } + + public static void setTablePath(Configuration configuration, String tablePath) { + if (null != tablePath) { + configuration.set(TABLE_PATH, tablePath); + } + } + + public static String getTablePath(Configuration configuration) { + return configuration.get(TABLE_PATH); + } - @Override - public RecordWriter<Void, T> getRecordWriter(FileSystem ignored, JobConf job, String name, - Progressable progress) throws IOException { + public static void setCarbonTable(Configuration configuration, CarbonTable carbonTable) + throws IOException { + if (carbonTable != null) { + configuration.set(TABLE, + ObjectSerializationUtil.convertObjectToString(carbonTable.getTableInfo().serialize())); + } + } + + public static CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IOException { + CarbonTable carbonTable = null; + String encodedString = configuration.get(TABLE); + if (encodedString != null) { + byte[] bytes = (byte[]) ObjectSerializationUtil.convertStringToObject(encodedString); + TableInfo tableInfo = TableInfo.deserialize(bytes); + carbonTable = CarbonTable.buildFromTableInfo(tableInfo); + } + return carbonTable; + } + + public static void setLoadModel(Configuration configuration, CarbonLoadModel loadModel) + throws IOException { + if (loadModel != null) { + configuration.set(LOAD_MODEL, ObjectSerializationUtil.convertObjectToString(loadModel)); + } + } + + public static void setInputSchema(Configuration configuration, StructType inputSchema) + throws IOException { + if (inputSchema != null && inputSchema.getFields().size() > 0) { + configuration.set(INPUT_SCHEMA, ObjectSerializationUtil.convertObjectToString(inputSchema)); + } else { + throw new UnsupportedOperationException("Input schema must be set"); + } + } + + private static StructType getInputSchema(Configuration configuration) throws IOException { + String encodedString = configuration.get(INPUT_SCHEMA); + if (encodedString != null) { + return (StructType) ObjectSerializationUtil.convertStringToObject(encodedString); + } return null; } + + public static boolean isOverwriteSet(Configuration configuration) { + String overwrite = configuration.get(OVERWRITE_SET); + if (overwrite != null) { + return Boolean.parseBoolean(overwrite); + } + return false; + } + + public static void setOverwrite(Configuration configuration, boolean overwrite) { + configuration.set(OVERWRITE_SET, String.valueOf(overwrite)); + } + + public static void setTempStoreLocations(Configuration configuration, String[] tempLocations) + throws IOException { + if (tempLocations != null && tempLocations.length > 0) { + configuration + .set(TEMP_STORE_LOCATIONS, ObjectSerializationUtil.convertObjectToString(tempLocations)); + } + } + + private static String[] getTempStoreLocations(TaskAttemptContext taskAttemptContext) + throws IOException { + String encodedString = taskAttemptContext.getConfiguration().get(TEMP_STORE_LOCATIONS); + if (encodedString != null) { + return (String[]) ObjectSerializationUtil.convertStringToObject(encodedString); + } + return new String[] { + System.getProperty("java.io.tmpdir") + "/" + System.nanoTime() + "_" + taskAttemptContext + .getTaskAttemptID().toString() }; + } + + @Override public synchronized OutputCommitter getOutputCommitter(TaskAttemptContext context) + throws IOException { + if (this.committer == null) { + Path output = getOutputPath(context); + this.committer = new CarbonOutputCommitter(output, context); + } + + return this.committer; + } + + @Override public RecordWriter<NullWritable, StringArrayWritable> getRecordWriter( + TaskAttemptContext taskAttemptContext) throws IOException { + final CarbonLoadModel loadModel = getLoadModel(taskAttemptContext.getConfiguration()); + loadModel.setTaskNo(new Random().nextInt(Integer.MAX_VALUE) + ""); + final String[] tempStoreLocations = getTempStoreLocations(taskAttemptContext); + final CarbonOutputIteratorWrapper iteratorWrapper = new CarbonOutputIteratorWrapper(); + final DataLoadExecutor dataLoadExecutor = new DataLoadExecutor(); + CarbonRecordWriter recordWriter = new CarbonRecordWriter(iteratorWrapper, dataLoadExecutor); + new Thread() { + @Override public void run() { + try { + dataLoadExecutor + .execute(loadModel, tempStoreLocations, new CarbonIterator[] { iteratorWrapper }); + } catch (Exception e) { + dataLoadExecutor.close(); + throw new RuntimeException(e); + } + } + }.start(); + + return recordWriter; + } + + public static CarbonLoadModel getLoadModel(Configuration conf) throws IOException { + CarbonLoadModel model; --- End diff -- But it can be accessed only through static method so we can't cache it.
---