Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1642#discussion_r157333626
  
    --- Diff: 
hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
 ---
    @@ -18,22 +18,311 @@
     package org.apache.carbondata.hadoop.api;
     
     import java.io.IOException;
    +import java.util.List;
    +import java.util.Random;
     
    -import org.apache.hadoop.fs.FileSystem;
    -import org.apache.hadoop.mapred.FileOutputFormat;
    -import org.apache.hadoop.mapred.JobConf;
    -import org.apache.hadoop.mapred.RecordWriter;
    -import org.apache.hadoop.util.Progressable;
    +import org.apache.carbondata.common.CarbonIterator;
    +import org.apache.carbondata.core.constants.CarbonCommonConstants;
    +import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
    +import org.apache.carbondata.core.metadata.datatype.StructField;
    +import org.apache.carbondata.core.metadata.datatype.StructType;
    +import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
    +import org.apache.carbondata.core.metadata.schema.table.TableInfo;
    +import org.apache.carbondata.core.util.CarbonProperties;
    +import org.apache.carbondata.hadoop.util.ObjectSerializationUtil;
    +import org.apache.carbondata.processing.loading.DataLoadExecutor;
    +import 
org.apache.carbondata.processing.loading.csvinput.StringArrayWritable;
    +import 
org.apache.carbondata.processing.loading.iterator.CarbonOutputIteratorWrapper;
    +import org.apache.carbondata.processing.loading.model.CarbonDataLoadSchema;
    +import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
    +
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.io.NullWritable;
    +import org.apache.hadoop.mapreduce.OutputCommitter;
    +import org.apache.hadoop.mapreduce.RecordWriter;
    +import org.apache.hadoop.mapreduce.TaskAttemptContext;
    +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     
     /**
      * Base class for all output format for CarbonData file.
    - * @param <T>
      */
    -public abstract class CarbonTableOutputFormat<T> extends 
FileOutputFormat<Void, T> {
    +public class CarbonTableOutputFormat extends 
FileOutputFormat<NullWritable, StringArrayWritable> {
    +
    +  private static final String LOAD_MODEL = 
"mapreduce.carbonoutputformat.load.mmodel";
    +  private static final String DATABASE_NAME = 
"mapreduce.carbonoutputformat.databaseName";
    +  private static final String TABLE_NAME = 
"mapreduce.carbonoutputformat.tableName";
    +  private static final String TABLE = "mapreduce.carbonoutputformat.table";
    +  private static final String TABLE_PATH = 
"mapreduce.carbonoutputformat.tablepath";
    +  private static final String INPUT_SCHEMA = 
"mapreduce.carbonoutputformat.inputschema";
    +  private static final String TEMP_STORE_LOCATIONS = 
"carbon.load.tempstore.locations";
    +  private static final String OVERWRITE_SET = "carbon.load.set.overwrite";
    +  public static final String COMPLEX_DELIMITERS = 
"mapreduce.carbonoutputformat.complex_delimiters";
    +  public static final String SERIALIZATION_NULL_FORMAT =
    +      "mapreduce.carbonoutputformat.serialization.null.format";
    +  public static final String BAD_RECORDS_LOGGER_ENABLE =
    +      "mapreduce.carbonoutputformat.bad.records.logger.enable";
    +  public static final String BAD_RECORDS_LOGGER_ACTION =
    +      "mapreduce.carbonoutputformat.bad.records.logger.action";
    +  public static final String IS_EMPTY_DATA_BAD_RECORD =
    +      "mapreduce.carbonoutputformat.empty.data.bad.record";
    +  public static final String SKIP_EMPTY_LINE = 
"mapreduce.carbonoutputformat.skip.empty.line";
    +  public static final String SORT_SCOPE = 
"mapreduce.carbonoutputformat.load.sort.scope";
    +  public static final String BATCH_SORT_SIZE_INMB =
    +      "mapreduce.carbonoutputformat.batch.sort.size.inmb";
    +  public static final String GLOBAL_SORT_PARTITIONS =
    +      "mapreduce.carbonoutputformat.global.sort.partitions";
    +  public static final String BAD_RECORD_PATH = 
"mapreduce.carbonoutputformat.bad.record.path";
    +  public static final String DATE_FORMAT = 
"mapreduce.carbonoutputformat.date.format";
    +  public static final String TIMESTAMP_FORMAT = 
"mapreduce.carbonoutputformat.timestamp.format";
    +  public static final String IS_ONE_PASS_LOAD = 
"mapreduce.carbonoutputformat.one.pass.load";
    +  public static final String DICTIONARY_SERVER_HOST =
    +      "mapreduce.carbonoutputformat.dict.server.host";
    +  public static final String DICTIONARY_SERVER_PORT =
    +      "mapreduce.carbonoutputformat.dict.server.port";
    +
    +  private CarbonOutputCommitter committer;
    +
    +  public static void setDatabaseName(Configuration configuration, String 
databaseName) {
    +    if (null != databaseName) {
    +      configuration.set(DATABASE_NAME, databaseName);
    +    }
    +  }
    +
    +  public static String getDatabaseName(Configuration configuration) {
    +    return configuration.get(DATABASE_NAME);
    +  }
    +
    +  public static void setTableName(Configuration configuration, String 
tableName) {
    +    if (null != tableName) {
    +      configuration.set(TABLE_NAME, tableName);
    +    }
    +  }
    +
    +  public static String getTableName(Configuration configuration) {
    +    return configuration.get(TABLE_NAME);
    +  }
    +
    +  public static void setTablePath(Configuration configuration, String 
tablePath) {
    +    if (null != tablePath) {
    +      configuration.set(TABLE_PATH, tablePath);
    +    }
    +  }
    +
    +  public static String getTablePath(Configuration configuration) {
    +    return configuration.get(TABLE_PATH);
    +  }
     
    -  @Override
    -  public RecordWriter<Void, T> getRecordWriter(FileSystem ignored, JobConf 
job, String name,
    -      Progressable progress) throws IOException {
    +  public static void setCarbonTable(Configuration configuration, 
CarbonTable carbonTable)
    +      throws IOException {
    +    if (carbonTable != null) {
    +      configuration.set(TABLE,
    +          
ObjectSerializationUtil.convertObjectToString(carbonTable.getTableInfo().serialize()));
    +    }
    +  }
    +
    +  public static CarbonTable getOrCreateCarbonTable(Configuration 
configuration) throws IOException {
    +    CarbonTable carbonTable = null;
    +    String encodedString = configuration.get(TABLE);
    +    if (encodedString != null) {
    +      byte[] bytes = (byte[]) 
ObjectSerializationUtil.convertStringToObject(encodedString);
    +      TableInfo tableInfo = TableInfo.deserialize(bytes);
    +      carbonTable = CarbonTable.buildFromTableInfo(tableInfo);
    +    }
    +    return carbonTable;
    +  }
    +
    +  public static void setLoadModel(Configuration configuration, 
CarbonLoadModel loadModel)
    +      throws IOException {
    +    if (loadModel != null) {
    +      configuration.set(LOAD_MODEL, 
ObjectSerializationUtil.convertObjectToString(loadModel));
    +    }
    +  }
    +
    +  public static void setInputSchema(Configuration configuration, 
StructType inputSchema)
    +      throws IOException {
    +    if (inputSchema != null && inputSchema.getFields().size() > 0) {
    +      configuration.set(INPUT_SCHEMA, 
ObjectSerializationUtil.convertObjectToString(inputSchema));
    +    } else {
    +      throw new UnsupportedOperationException("Input schema must be set");
    +    }
    +  }
    +
    +  private static StructType getInputSchema(Configuration configuration) 
throws IOException {
    +    String encodedString = configuration.get(INPUT_SCHEMA);
    +    if (encodedString != null) {
    +      return (StructType) 
ObjectSerializationUtil.convertStringToObject(encodedString);
    +    }
         return null;
       }
    +
    +  public static boolean isOverwriteSet(Configuration configuration) {
    +    String overwrite = configuration.get(OVERWRITE_SET);
    +    if (overwrite != null) {
    +      return Boolean.parseBoolean(overwrite);
    +    }
    +    return false;
    +  }
    +
    +  public static void setOverwrite(Configuration configuration, boolean 
overwrite) {
    +    configuration.set(OVERWRITE_SET, String.valueOf(overwrite));
    +  }
    +
    +  public static void setTempStoreLocations(Configuration configuration, 
String[] tempLocations)
    +      throws IOException {
    +    if (tempLocations != null && tempLocations.length > 0) {
    +      configuration
    +          .set(TEMP_STORE_LOCATIONS, 
ObjectSerializationUtil.convertObjectToString(tempLocations));
    +    }
    +  }
    +
    +  private static String[] getTempStoreLocations(TaskAttemptContext 
taskAttemptContext)
    +      throws IOException {
    +    String encodedString = 
taskAttemptContext.getConfiguration().get(TEMP_STORE_LOCATIONS);
    +    if (encodedString != null) {
    +      return (String[]) 
ObjectSerializationUtil.convertStringToObject(encodedString);
    +    }
    +    return new String[] {
    +        System.getProperty("java.io.tmpdir") + "/" + System.nanoTime() + 
"_" + taskAttemptContext
    +            .getTaskAttemptID().toString() };
    +  }
    +
    +  @Override public synchronized OutputCommitter 
getOutputCommitter(TaskAttemptContext context)
    +      throws IOException {
    +    if (this.committer == null) {
    +      Path output = getOutputPath(context);
    +      this.committer = new CarbonOutputCommitter(output, context);
    +    }
    +
    +    return this.committer;
    +  }
    +
    +  @Override public RecordWriter<NullWritable, StringArrayWritable> 
getRecordWriter(
    +      TaskAttemptContext taskAttemptContext) throws IOException {
    +    final CarbonLoadModel loadModel = 
getLoadModel(taskAttemptContext.getConfiguration());
    +    loadModel.setTaskNo(new Random().nextInt(Integer.MAX_VALUE) + "");
    --- End diff --
    
    Now I get taskid from taskcontext instead of random


---

Reply via email to