Github user ajantha-bhat commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2384#discussion_r197638101 --- Diff: store/sdk/src/main/java/org/apache/carbondata/sdk/file/JsonCarbonWriter.java --- @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.sdk.file; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; +import java.util.Random; +import java.util.UUID; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat; +import org.apache.carbondata.hadoop.internal.ObjectArrayWritable; +import org.apache.carbondata.processing.loading.model.CarbonLoadModel; +import org.apache.carbondata.util.JsonCarbonUtil; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskID; +import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; + +/** + * Writer Implementation to write Json Record to carbondata file. + * json writer requires the path of json file and carbon schema. + */ +@InterfaceAudience.User public class JsonCarbonWriter extends CarbonWriter { + private RecordWriter<NullWritable, ObjectArrayWritable> recordWriter; + private TaskAttemptContext context; + private ObjectArrayWritable writable; + private Schema carbonSchema; + + JsonCarbonWriter(CarbonLoadModel loadModel, Schema carbonSchema) throws IOException { + Configuration OutputHadoopConf = new Configuration(); + CarbonTableOutputFormat.setLoadModel(OutputHadoopConf, loadModel); + CarbonTableOutputFormat outputFormat = new CarbonTableOutputFormat(); + JobID jobId = new JobID(UUID.randomUUID().toString(), 0); + Random random = new Random(); + TaskID task = new TaskID(jobId, TaskType.MAP, random.nextInt()); + TaskAttemptID attemptID = new TaskAttemptID(task, random.nextInt()); + TaskAttemptContextImpl context = new TaskAttemptContextImpl(OutputHadoopConf, attemptID); + this.recordWriter = outputFormat.getRecordWriter(context); + this.context = context; + this.writable = new ObjectArrayWritable(); + this.carbonSchema = carbonSchema; + } + + /** + * Write single row data, accepts one row of data as json string + * + * @param object (json row as a string) + * @throws IOException + */ + @Override public void write(Object object) throws IOException { + Objects.requireNonNull(object, "Input cannot be null"); + try { + Map<String, Object> jsonNodeMap; + ObjectMapper objectMapper = new ObjectMapper(); + try { + jsonNodeMap = + objectMapper.readValue((String) object, new TypeReference<Map<String, Object>>() { + }); + } catch (IOException e) { + throw new IOException("Failed to parse Json row string "); + } + // convert json object to carbon object. + Object[] writeObjects = JsonCarbonUtil.jsonToCarbonRecord(jsonNodeMap, carbonSchema); + writable.set(writeObjects); + recordWriter.write(NullWritable.get(), writable); + } catch (Exception e) { + close(); + throw new IOException(e); + } + } + + /** + * Takes file or directory path, + * containing array of json rows to write carbondata files. + * + * @param inputFilePath + * @param recordIdentifier + * @throws IOException + */ + public void writeFromJsonFile(String inputFilePath, String recordIdentifier) throws IOException { --- End diff -- removed it.
---