jackylk commented on a change in pull request #3875:
URL: https://github.com/apache/carbondata/pull/3875#discussion_r491960506



##########
File path: 
integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonOutputCommitter.java
##########
@@ -52,25 +53,30 @@
 
   @Override
   public void setupJob(JobContext jobContext) throws IOException {
-    
ThreadLocalSessionInfo.setConfigurationToCurrentThread(jobContext.getConfiguration());
-    String a = jobContext.getJobConf().get(JobConf.MAPRED_MAP_TASK_ENV);
     Random random = new Random();
     JobID jobId = new JobID(UUID.randomUUID().toString(), 0);
     TaskID task = new TaskID(jobId, TaskType.MAP, random.nextInt());
     TaskAttemptID attemptID = new TaskAttemptID(task, random.nextInt());
     org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl context =
         new TaskAttemptContextImpl(jobContext.getJobConf(), attemptID);
-    CarbonLoadModel carbonLoadModel =
-        HiveCarbonUtil.getCarbonLoadModel(jobContext.getConfiguration());
-    CarbonTableOutputFormat.setLoadModel(jobContext.getConfiguration(), 
carbonLoadModel);
+    CarbonLoadModel carbonLoadModel = null;
+    String encodedString = 
jobContext.getJobConf().get(CarbonTableOutputFormat.LOAD_MODEL);
+    if (encodedString != null) {

Review comment:
       please add comment, why this is needed

##########
File path: 
hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
##########
@@ -76,7 +76,7 @@
 // TODO Move dictionary generator which is coded in spark to MR framework.
 public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, 
ObjectArrayWritable> {
 
-  protected static final String LOAD_MODEL = 
"mapreduce.carbontable.load.model";
+  public static final String LOAD_MODEL = "mapreduce.carbontable.load.model";

Review comment:
       please move all public variable together into the beginning of this class

##########
File path: 
integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonOutputCommitter.java
##########
@@ -52,25 +53,30 @@
 
   @Override
   public void setupJob(JobContext jobContext) throws IOException {
-    
ThreadLocalSessionInfo.setConfigurationToCurrentThread(jobContext.getConfiguration());
-    String a = jobContext.getJobConf().get(JobConf.MAPRED_MAP_TASK_ENV);
     Random random = new Random();
     JobID jobId = new JobID(UUID.randomUUID().toString(), 0);
     TaskID task = new TaskID(jobId, TaskType.MAP, random.nextInt());
     TaskAttemptID attemptID = new TaskAttemptID(task, random.nextInt());
     org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl context =
         new TaskAttemptContextImpl(jobContext.getJobConf(), attemptID);
-    CarbonLoadModel carbonLoadModel =
-        HiveCarbonUtil.getCarbonLoadModel(jobContext.getConfiguration());
-    CarbonTableOutputFormat.setLoadModel(jobContext.getConfiguration(), 
carbonLoadModel);
+    CarbonLoadModel carbonLoadModel = null;
+    String encodedString = 
jobContext.getJobConf().get(CarbonTableOutputFormat.LOAD_MODEL);
+    if (encodedString != null) {
+      carbonLoadModel =
+          (CarbonLoadModel) 
ObjectSerializationUtil.convertStringToObject(encodedString);
+    }
+    if (null == carbonLoadModel) {
+      
ThreadLocalSessionInfo.setConfigurationToCurrentThread(jobContext.getConfiguration());
+      String a = jobContext.getJobConf().get(JobConf.MAPRED_MAP_TASK_ENV);
+      carbonLoadModel = 
HiveCarbonUtil.getCarbonLoadModel(jobContext.getConfiguration());
+      CarbonTableOutputFormat.setLoadModel(jobContext.getConfiguration(), 
carbonLoadModel);
+      String loadModelStr = 
jobContext.getConfiguration().get(CarbonTableOutputFormat.LOAD_MODEL);
+      jobContext.getJobConf().set(JobConf.MAPRED_MAP_TASK_ENV, a + ",carbon=" 
+ loadModelStr);

Review comment:
       please add comment, what is it doing?

##########
File path: 
integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonOutputFormat.java
##########
@@ -92,6 +95,11 @@ public void checkOutputSpecs(FileSystem fileSystem, JobConf 
jobConf) throws IOEx
     }
     String tablePath = 
FileFactory.getCarbonFile(carbonLoadModel.getTablePath()).getAbsolutePath();
     TaskAttemptID taskAttemptID = 
TaskAttemptID.forName(jc.get("mapred.task.id"));
+    if (taskAttemptID == null) {
+      SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmm");

Review comment:
       please add comment, why is it needed. In what case it will be null?

##########
File path: 
integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableConfig.java
##########
@@ -40,6 +40,12 @@
   private String endPoint;
   private String pushRowFilter;
 
+  /**
+   * Property to send load model from coordinator to worker in presto. This is 
internal constant
+   * and not exposed to user.

Review comment:
       describe what are the option values

##########
File path: 
integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataFileWriter.java
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat;
+import org.apache.carbondata.hive.CarbonHiveSerDe;
+import org.apache.carbondata.hive.MapredCarbonOutputFormat;
+import org.apache.carbondata.presto.impl.CarbonTableConfig;
+
+import com.google.common.collect.ImmutableList;
+import io.prestosql.plugin.hive.HiveFileWriter;
+import io.prestosql.plugin.hive.HiveType;
+import io.prestosql.plugin.hive.HiveWriteUtils;
+import io.prestosql.spi.Page;
+import io.prestosql.spi.PrestoException;
+import io.prestosql.spi.block.Block;
+import io.prestosql.spi.type.Type;
+import io.prestosql.spi.type.TypeManager;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.hive.ql.io.IOConstants;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import 
org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.log4j.Logger;
+
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_WRITER_DATA_ERROR;
+import static java.util.Objects.requireNonNull;
+import static java.util.stream.Collectors.toList;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.COMPRESSRESULT;
+
+/**
+ * This class implements HiveFileWriter and it creates the carbonFileWriter to 
write the age data
+ * sent from presto.
+ */
+public class CarbonDataFileWriter implements HiveFileWriter {
+
+  private static final Logger LOG =
+      LogServiceFactory.getLogService(CarbonDataFileWriter.class.getName());
+
+  private final JobConf configuration;
+  private Path outPutPath;
+  private final FileSinkOperator.RecordWriter recordWriter;
+  private final CarbonHiveSerDe serDe;
+  private final int fieldCount;
+  private final Object row;
+  private final SettableStructObjectInspector tableInspector;
+  private final List<StructField> structFields;
+  private final HiveWriteUtils.FieldSetter[] setters;
+
+  private boolean isCommitDone;
+
+  public CarbonDataFileWriter(Path outPutPath, List<String> inputColumnNames, 
Properties properties,
+      JobConf configuration, TypeManager typeManager) throws SerDeException {
+    this.outPutPath = requireNonNull(outPutPath, "path is null");
+    this.outPutPath = new Path(properties.getProperty("location"));
+    outPutPath = new Path(properties.getProperty("location"));
+    this.configuration = requireNonNull(configuration, "conf is null");
+    List<String> columnNames = Arrays
+        .asList(properties.getProperty(IOConstants.COLUMNS, 
"").split(CarbonCommonConstants.COMMA));
+    List<Type> fileColumnTypes =
+        HiveType.toHiveTypes(properties.getProperty(IOConstants.COLUMNS_TYPES, 
"")).stream()
+            .map(hiveType -> hiveType.getType(typeManager)).collect(toList());
+    fieldCount = columnNames.size();
+    serDe = new CarbonHiveSerDe();
+    serDe.initialize(configuration, properties);
+    tableInspector = (ArrayWritableObjectInspector) serDe.getObjectInspector();
+
+    structFields = ImmutableList.copyOf(
+        inputColumnNames.stream().map(tableInspector::getStructFieldRef)
+            .collect(toImmutableList()));
+
+    row = tableInspector.create();
+
+    setters = new HiveWriteUtils.FieldSetter[structFields.size()];
+    for (int i = 0; i < setters.length; i++) {
+      setters[i] = HiveWriteUtils.createFieldSetter(tableInspector, row, 
structFields.get(i),
+          fileColumnTypes.get(structFields.get(i).getFieldID()));
+    }
+    String encodedLoadModel = 
configuration.get(CarbonTableConfig.CARBON_PRESTO_LOAD_MODEL);
+    if (StringUtils.isNotEmpty(encodedLoadModel)) {

Review comment:
       why not check encodedLoadModel against true or false?

##########
File path: 
integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataFileWriter.java
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat;
+import org.apache.carbondata.hive.CarbonHiveSerDe;
+import org.apache.carbondata.hive.MapredCarbonOutputFormat;
+import org.apache.carbondata.presto.impl.CarbonTableConfig;
+
+import com.google.common.collect.ImmutableList;
+import io.prestosql.plugin.hive.HiveFileWriter;
+import io.prestosql.plugin.hive.HiveType;
+import io.prestosql.plugin.hive.HiveWriteUtils;
+import io.prestosql.spi.Page;
+import io.prestosql.spi.PrestoException;
+import io.prestosql.spi.block.Block;
+import io.prestosql.spi.type.Type;
+import io.prestosql.spi.type.TypeManager;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.hive.ql.io.IOConstants;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import 
org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.log4j.Logger;
+
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_WRITER_DATA_ERROR;
+import static java.util.Objects.requireNonNull;
+import static java.util.stream.Collectors.toList;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.COMPRESSRESULT;
+
+/**
+ * This class implements HiveFileWriter and it creates the carbonFileWriter to 
write the age data
+ * sent from presto.
+ */
+public class CarbonDataFileWriter implements HiveFileWriter {
+
+  private static final Logger LOG =
+      LogServiceFactory.getLogService(CarbonDataFileWriter.class.getName());
+
+  private final JobConf configuration;
+  private Path outPutPath;
+  private final FileSinkOperator.RecordWriter recordWriter;
+  private final CarbonHiveSerDe serDe;
+  private final int fieldCount;
+  private final Object row;
+  private final SettableStructObjectInspector tableInspector;
+  private final List<StructField> structFields;
+  private final HiveWriteUtils.FieldSetter[] setters;
+
+  private boolean isCommitDone;
+
+  public CarbonDataFileWriter(Path outPutPath, List<String> inputColumnNames, 
Properties properties,
+      JobConf configuration, TypeManager typeManager) throws SerDeException {
+    this.outPutPath = requireNonNull(outPutPath, "path is null");

Review comment:
       do not assign this.outPutPath  repeatedly

##########
File path: 
integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataFileWriter.java
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat;
+import org.apache.carbondata.hive.CarbonHiveSerDe;
+import org.apache.carbondata.hive.MapredCarbonOutputFormat;
+import org.apache.carbondata.presto.impl.CarbonTableConfig;
+
+import com.google.common.collect.ImmutableList;
+import io.prestosql.plugin.hive.HiveFileWriter;
+import io.prestosql.plugin.hive.HiveType;
+import io.prestosql.plugin.hive.HiveWriteUtils;
+import io.prestosql.spi.Page;
+import io.prestosql.spi.PrestoException;
+import io.prestosql.spi.block.Block;
+import io.prestosql.spi.type.Type;
+import io.prestosql.spi.type.TypeManager;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.hive.ql.io.IOConstants;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import 
org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.log4j.Logger;
+
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_WRITER_DATA_ERROR;
+import static java.util.Objects.requireNonNull;
+import static java.util.stream.Collectors.toList;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.COMPRESSRESULT;
+
+/**
+ * This class implements HiveFileWriter and it creates the carbonFileWriter to 
write the age data
+ * sent from presto.
+ */
+public class CarbonDataFileWriter implements HiveFileWriter {
+
+  private static final Logger LOG =
+      LogServiceFactory.getLogService(CarbonDataFileWriter.class.getName());
+
+  private final JobConf configuration;
+  private Path outPutPath;
+  private final FileSinkOperator.RecordWriter recordWriter;
+  private final CarbonHiveSerDe serDe;
+  private final int fieldCount;
+  private final Object row;
+  private final SettableStructObjectInspector tableInspector;
+  private final List<StructField> structFields;
+  private final HiveWriteUtils.FieldSetter[] setters;
+
+  private boolean isCommitDone;
+
+  public CarbonDataFileWriter(Path outPutPath, List<String> inputColumnNames, 
Properties properties,
+      JobConf configuration, TypeManager typeManager) throws SerDeException {
+    this.outPutPath = requireNonNull(outPutPath, "path is null");
+    this.outPutPath = new Path(properties.getProperty("location"));
+    outPutPath = new Path(properties.getProperty("location"));
+    this.configuration = requireNonNull(configuration, "conf is null");
+    List<String> columnNames = Arrays
+        .asList(properties.getProperty(IOConstants.COLUMNS, 
"").split(CarbonCommonConstants.COMMA));
+    List<Type> fileColumnTypes =
+        HiveType.toHiveTypes(properties.getProperty(IOConstants.COLUMNS_TYPES, 
"")).stream()
+            .map(hiveType -> hiveType.getType(typeManager)).collect(toList());
+    fieldCount = columnNames.size();
+    serDe = new CarbonHiveSerDe();
+    serDe.initialize(configuration, properties);
+    tableInspector = (ArrayWritableObjectInspector) serDe.getObjectInspector();
+
+    structFields = ImmutableList.copyOf(
+        inputColumnNames.stream().map(tableInspector::getStructFieldRef)
+            .collect(toImmutableList()));
+
+    row = tableInspector.create();
+
+    setters = new HiveWriteUtils.FieldSetter[structFields.size()];
+    for (int i = 0; i < setters.length; i++) {
+      setters[i] = HiveWriteUtils.createFieldSetter(tableInspector, row, 
structFields.get(i),
+          fileColumnTypes.get(structFields.get(i).getFieldID()));
+    }
+    String encodedLoadModel = 
configuration.get(CarbonTableConfig.CARBON_PRESTO_LOAD_MODEL);
+    if (StringUtils.isNotEmpty(encodedLoadModel)) {
+      configuration.set(CarbonTableOutputFormat.LOAD_MODEL, encodedLoadModel);
+    }
+    try {
+      boolean compress = HiveConf.getBoolVar(configuration, COMPRESSRESULT);
+      Object writer =
+          
Class.forName(MapredCarbonOutputFormat.class.getName()).getConstructor().newInstance();
+      recordWriter = ((MapredCarbonOutputFormat<?>) writer)
+          .getHiveRecordWriter(this.configuration, outPutPath, Text.class, 
compress,
+              properties, Reporter.NULL);
+    } catch (Exception e) {
+      LOG.error("error while initializing writer", e);
+      throw new RuntimeException("writer class not found");
+    }
+  }
+
+  @Override public long getWrittenBytes() {

Review comment:
       move @Override to previous line

##########
File path: 
integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataHandleResolver.java
##########
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import io.prestosql.plugin.hive.HiveHandleResolver;
+import io.prestosql.spi.connector.ConnectorInsertTableHandle;
+
+public class CarbonDataHandleResolver extends HiveHandleResolver {
+
+  @Override public Class<? extends ConnectorInsertTableHandle> 
getInsertTableHandleClass() {

Review comment:
       move all @Override to previous line

##########
File path: 
integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonMetadataFactory.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import java.util.Objects;
+import java.util.concurrent.ExecutorService;
+
+import com.google.inject.Inject;
+import io.airlift.concurrent.BoundedExecutor;
+import io.airlift.json.JsonCodec;
+import io.airlift.log.Logger;
+import io.prestosql.plugin.hive.ForHive;
+import io.prestosql.plugin.hive.HdfsEnvironment;
+import io.prestosql.plugin.hive.HiveConfig;
+import io.prestosql.plugin.hive.HiveMetadata;
+import io.prestosql.plugin.hive.HiveMetadataFactory;
+import io.prestosql.plugin.hive.HivePartitionManager;
+import io.prestosql.plugin.hive.LocationService;
+import io.prestosql.plugin.hive.NodeVersion;
+import io.prestosql.plugin.hive.PartitionUpdate;
+import io.prestosql.plugin.hive.TypeTranslator;
+import io.prestosql.plugin.hive.metastore.CachingHiveMetastore;
+import io.prestosql.plugin.hive.metastore.HiveMetastore;
+import io.prestosql.plugin.hive.metastore.SemiTransactionalHiveMetastore;
+import io.prestosql.plugin.hive.security.AccessControlMetadataFactory;
+import io.prestosql.plugin.hive.statistics.MetastoreHiveStatisticsProvider;
+import io.prestosql.spi.type.TypeManager;
+import org.joda.time.DateTimeZone;
+
+public class CarbonMetadataFactory extends HiveMetadataFactory {
+
+  private static final Logger log = Logger.get(HiveMetadataFactory.class);
+  private final boolean allowCorruptWritesForTesting;
+  private final boolean skipDeletionForAlter;
+  private final boolean skipTargetCleanupOnRollback;
+  private final boolean writesToNonManagedTablesEnabled = true;
+  private final boolean createsOfNonManagedTablesEnabled;
+  private final long perTransactionCacheMaximumSize;
+  private final HiveMetastore metastore;
+  private final HdfsEnvironment hdfsEnvironment;
+  private final HivePartitionManager partitionManager;
+  private final DateTimeZone timeZone;
+  private final TypeManager typeManager;
+  private final LocationService locationService;
+  private final BoundedExecutor renameExecution;
+  private final TypeTranslator typeTranslator;
+  private final String prestoVersion;
+  private final AccessControlMetadataFactory accessControlMetadataFactory;
+  private final JsonCodec partitionUpdateCodec;
+
+  @Inject public CarbonMetadataFactory(HiveConfig hiveConfig, HiveMetastore 
metastore,
+      HdfsEnvironment hdfsEnvironment, HivePartitionManager partitionManager,
+      @ForHive ExecutorService executorService, TypeManager typeManager,
+      LocationService locationService, JsonCodec<PartitionUpdate> 
partitionUpdateCodec,
+      TypeTranslator typeTranslator, NodeVersion nodeVersion,
+      AccessControlMetadataFactory accessControlMetadataFactory) {
+    this(metastore, hdfsEnvironment, partitionManager, 
hiveConfig.getDateTimeZone(),
+        hiveConfig.getMaxConcurrentFileRenames(), 
hiveConfig.getAllowCorruptWritesForTesting(),
+        hiveConfig.isSkipDeletionForAlter(), 
hiveConfig.isSkipTargetCleanupOnRollback(),
+        hiveConfig.getWritesToNonManagedTablesEnabled(),
+        hiveConfig.getCreatesOfNonManagedTablesEnabled(),
+        hiveConfig.getPerTransactionMetastoreCacheMaximumSize(), typeManager, 
locationService,
+        partitionUpdateCodec, executorService, typeTranslator, 
nodeVersion.toString(),
+        accessControlMetadataFactory);
+  }
+
+  public CarbonMetadataFactory(HiveMetastore metastore, HdfsEnvironment 
hdfsEnvironment,
+      HivePartitionManager partitionManager, DateTimeZone timeZone, int 
maxConcurrentFileRenames,
+      boolean allowCorruptWritesForTesting, boolean skipDeletionForAlter,

Review comment:
       too many paramters

##########
File path: 
integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataFileWriter.java
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat;
+import org.apache.carbondata.hive.CarbonHiveSerDe;
+import org.apache.carbondata.hive.MapredCarbonOutputFormat;
+import org.apache.carbondata.presto.impl.CarbonTableConfig;
+
+import com.google.common.collect.ImmutableList;
+import io.prestosql.plugin.hive.HiveFileWriter;
+import io.prestosql.plugin.hive.HiveType;
+import io.prestosql.plugin.hive.HiveWriteUtils;
+import io.prestosql.spi.Page;
+import io.prestosql.spi.PrestoException;
+import io.prestosql.spi.block.Block;
+import io.prestosql.spi.type.Type;
+import io.prestosql.spi.type.TypeManager;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.hive.ql.io.IOConstants;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import 
org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.log4j.Logger;
+
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_WRITER_DATA_ERROR;
+import static java.util.Objects.requireNonNull;
+import static java.util.stream.Collectors.toList;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.COMPRESSRESULT;
+
+/**
+ * This class implements HiveFileWriter and it creates the carbonFileWriter to 
write the age data

Review comment:
       age data?

##########
File path: 
integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataFileWriter.java
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat;
+import org.apache.carbondata.hive.CarbonHiveSerDe;
+import org.apache.carbondata.hive.MapredCarbonOutputFormat;
+import org.apache.carbondata.presto.impl.CarbonTableConfig;
+
+import com.google.common.collect.ImmutableList;
+import io.prestosql.plugin.hive.HiveFileWriter;
+import io.prestosql.plugin.hive.HiveType;
+import io.prestosql.plugin.hive.HiveWriteUtils;
+import io.prestosql.spi.Page;
+import io.prestosql.spi.PrestoException;
+import io.prestosql.spi.block.Block;
+import io.prestosql.spi.type.Type;
+import io.prestosql.spi.type.TypeManager;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.hive.ql.io.IOConstants;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import 
org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.log4j.Logger;
+
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_WRITER_DATA_ERROR;
+import static java.util.Objects.requireNonNull;
+import static java.util.stream.Collectors.toList;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.COMPRESSRESULT;
+
+/**
+ * This class implements HiveFileWriter and it creates the carbonFileWriter to 
write the age data
+ * sent from presto.
+ */
+public class CarbonDataFileWriter implements HiveFileWriter {
+
+  private static final Logger LOG =
+      LogServiceFactory.getLogService(CarbonDataFileWriter.class.getName());
+
+  private final JobConf configuration;
+  private Path outPutPath;

Review comment:
       This can be final

##########
File path: 
integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataFileWriter.java
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat;
+import org.apache.carbondata.hive.CarbonHiveSerDe;
+import org.apache.carbondata.hive.MapredCarbonOutputFormat;
+import org.apache.carbondata.presto.impl.CarbonTableConfig;
+
+import com.google.common.collect.ImmutableList;
+import io.prestosql.plugin.hive.HiveFileWriter;
+import io.prestosql.plugin.hive.HiveType;
+import io.prestosql.plugin.hive.HiveWriteUtils;
+import io.prestosql.spi.Page;
+import io.prestosql.spi.PrestoException;
+import io.prestosql.spi.block.Block;
+import io.prestosql.spi.type.Type;
+import io.prestosql.spi.type.TypeManager;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.hive.ql.io.IOConstants;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import 
org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.log4j.Logger;
+
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_WRITER_DATA_ERROR;
+import static java.util.Objects.requireNonNull;
+import static java.util.stream.Collectors.toList;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.COMPRESSRESULT;
+
+/**
+ * This class implements HiveFileWriter and it creates the carbonFileWriter to 
write the age data
+ * sent from presto.
+ */
+public class CarbonDataFileWriter implements HiveFileWriter {
+
+  private static final Logger LOG =
+      LogServiceFactory.getLogService(CarbonDataFileWriter.class.getName());
+
+  private final JobConf configuration;
+  private Path outPutPath;
+  private final FileSinkOperator.RecordWriter recordWriter;
+  private final CarbonHiveSerDe serDe;
+  private final int fieldCount;
+  private final Object row;
+  private final SettableStructObjectInspector tableInspector;
+  private final List<StructField> structFields;
+  private final HiveWriteUtils.FieldSetter[] setters;
+
+  private boolean isCommitDone;
+
+  public CarbonDataFileWriter(Path outPutPath, List<String> inputColumnNames, 
Properties properties,
+      JobConf configuration, TypeManager typeManager) throws SerDeException {
+    this.outPutPath = requireNonNull(outPutPath, "path is null");
+    this.outPutPath = new Path(properties.getProperty("location"));
+    outPutPath = new Path(properties.getProperty("location"));
+    this.configuration = requireNonNull(configuration, "conf is null");
+    List<String> columnNames = Arrays
+        .asList(properties.getProperty(IOConstants.COLUMNS, 
"").split(CarbonCommonConstants.COMMA));
+    List<Type> fileColumnTypes =
+        HiveType.toHiveTypes(properties.getProperty(IOConstants.COLUMNS_TYPES, 
"")).stream()
+            .map(hiveType -> hiveType.getType(typeManager)).collect(toList());
+    fieldCount = columnNames.size();

Review comment:
       use `this` for all field assignment in the constructor

##########
File path: 
integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataLocationService.java
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import com.google.inject.Inject;
+import io.prestosql.plugin.hive.HdfsEnvironment;
+import io.prestosql.plugin.hive.HiveLocationService;
+import io.prestosql.plugin.hive.HiveWriteUtils;
+import io.prestosql.plugin.hive.LocationHandle;
+import io.prestosql.plugin.hive.metastore.SemiTransactionalHiveMetastore;
+import io.prestosql.plugin.hive.metastore.Table;
+import io.prestosql.spi.connector.ConnectorSession;
+import org.apache.hadoop.fs.Path;
+
+public class CarbonDataLocationService extends HiveLocationService {
+
+  private final HdfsEnvironment hdfsEnvironment;
+
+  @Inject
+  public CarbonDataLocationService(HdfsEnvironment hdfsEnvironment) {
+    super(hdfsEnvironment);
+    this.hdfsEnvironment = hdfsEnvironment;
+  }
+
+  @Override
+  public LocationHandle forNewTable(SemiTransactionalHiveMetastore metastore,
+      ConnectorSession session, String schemaName, String tableName) {
+    // TODO: check and make it compatible for cloud scenario

Review comment:
       What is missing here?

##########
File path: 
integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataFileWriter.java
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat;
+import org.apache.carbondata.hive.CarbonHiveSerDe;
+import org.apache.carbondata.hive.MapredCarbonOutputFormat;
+import org.apache.carbondata.presto.impl.CarbonTableConfig;
+
+import com.google.common.collect.ImmutableList;
+import io.prestosql.plugin.hive.HiveFileWriter;
+import io.prestosql.plugin.hive.HiveType;
+import io.prestosql.plugin.hive.HiveWriteUtils;
+import io.prestosql.spi.Page;
+import io.prestosql.spi.PrestoException;
+import io.prestosql.spi.block.Block;
+import io.prestosql.spi.type.Type;
+import io.prestosql.spi.type.TypeManager;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.hive.ql.io.IOConstants;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import 
org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.log4j.Logger;
+
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_WRITER_DATA_ERROR;
+import static java.util.Objects.requireNonNull;
+import static java.util.stream.Collectors.toList;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.COMPRESSRESULT;
+
+/**
+ * This class implements HiveFileWriter and it creates the carbonFileWriter to 
write the age data
+ * sent from presto.
+ */
+public class CarbonDataFileWriter implements HiveFileWriter {
+
+  private static final Logger LOG =
+      LogServiceFactory.getLogService(CarbonDataFileWriter.class.getName());
+
+  private final JobConf configuration;
+  private Path outPutPath;
+  private final FileSinkOperator.RecordWriter recordWriter;
+  private final CarbonHiveSerDe serDe;
+  private final int fieldCount;
+  private final Object row;
+  private final SettableStructObjectInspector tableInspector;
+  private final List<StructField> structFields;
+  private final HiveWriteUtils.FieldSetter[] setters;
+
+  private boolean isCommitDone;
+
+  public CarbonDataFileWriter(Path outPutPath, List<String> inputColumnNames, 
Properties properties,
+      JobConf configuration, TypeManager typeManager) throws SerDeException {
+    this.outPutPath = requireNonNull(outPutPath, "path is null");
+    this.outPutPath = new Path(properties.getProperty("location"));
+    outPutPath = new Path(properties.getProperty("location"));

Review comment:
       do not assign this.outPutPath repeatedly

##########
File path: 
integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataMetaData.java
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Optional;
+import java.util.Properties;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.ThreadLocalSessionInfo;
+import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat;
+import org.apache.carbondata.hive.MapredCarbonOutputCommitter;
+import org.apache.carbondata.hive.util.HiveCarbonUtil;
+import org.apache.carbondata.presto.impl.CarbonTableConfig;
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
+
+import com.google.common.collect.ImmutableMap;
+import io.airlift.slice.Slice;
+import io.prestosql.plugin.hive.HdfsEnvironment;
+import io.prestosql.plugin.hive.HiveInsertTableHandle;
+import io.prestosql.plugin.hive.HiveMetadata;
+import io.prestosql.plugin.hive.HivePartitionManager;
+import io.prestosql.plugin.hive.LocationService;
+import io.prestosql.plugin.hive.PartitionUpdate;
+import io.prestosql.plugin.hive.TypeTranslator;
+import io.prestosql.plugin.hive.metastore.MetastoreUtil;
+import io.prestosql.plugin.hive.metastore.SemiTransactionalHiveMetastore;
+import io.prestosql.plugin.hive.metastore.Table;
+import io.prestosql.plugin.hive.security.AccessControlMetadata;
+import io.prestosql.plugin.hive.statistics.HiveStatisticsProvider;
+import io.prestosql.plugin.hive.util.ConfigurationUtils;
+import io.prestosql.spi.connector.ConnectorInsertTableHandle;
+import io.prestosql.spi.connector.ConnectorOutputMetadata;
+import io.prestosql.spi.connector.ConnectorSession;
+import io.prestosql.spi.connector.ConnectorTableHandle;
+import io.prestosql.spi.connector.SchemaTableName;
+import io.prestosql.spi.statistics.ComputedStatistics;
+import io.prestosql.spi.type.TypeManager;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContextImpl;
+import org.apache.hadoop.mapred.JobID;
+
+import org.apache.hadoop.mapred.TaskAttemptContextImpl;
+import org.apache.log4j.Logger;
+import org.joda.time.DateTimeZone;
+
+public class CarbonDataMetaData extends HiveMetadata {
+
+  private static final Logger LOG =
+      LogServiceFactory.getLogService(CarbonDataMetaData.class.getName());
+
+  private HdfsEnvironment hdfsEnvironment;
+  private SemiTransactionalHiveMetastore metastore;
+  private MapredCarbonOutputCommitter carbonOutputCommitter;
+  private JobContextImpl jobContext;
+
+  public CarbonDataMetaData(SemiTransactionalHiveMetastore metastore,
+      HdfsEnvironment hdfsEnvironment, HivePartitionManager partitionManager, 
DateTimeZone timeZone,
+      boolean allowCorruptWritesForTesting, boolean 
writesToNonManagedTablesEnabled,

Review comment:
       Too many parameters, should create a parameter object

##########
File path: 
integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataFileWriterFactory.java
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+
+import com.google.inject.Inject;
+import io.prestosql.plugin.hive.FileFormatDataSourceStats;
+import io.prestosql.plugin.hive.HdfsEnvironment;
+import io.prestosql.plugin.hive.HiveFileWriter;
+import io.prestosql.plugin.hive.HiveFileWriterFactory;
+import io.prestosql.plugin.hive.NodeVersion;
+import io.prestosql.plugin.hive.metastore.StorageFormat;
+import io.prestosql.spi.connector.ConnectorSession;
+import io.prestosql.spi.type.TypeManager;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.mapred.JobConf;
+
+import static java.util.Objects.requireNonNull;
+
+public class CarbonDataFileWriterFactory implements HiveFileWriterFactory {
+
+  private final HdfsEnvironment hdfsEnvironment;
+  private final TypeManager typeManager;
+  private final NodeVersion nodeVersion;
+  private final FileFormatDataSourceStats stats;
+
+  @Inject
+  public CarbonDataFileWriterFactory(HdfsEnvironment hdfsEnvironment, 
TypeManager typeManager,
+      NodeVersion nodeVersion, FileFormatDataSourceStats stats) {
+    this(typeManager, hdfsEnvironment, nodeVersion, stats);
+  }
+
+  public CarbonDataFileWriterFactory(TypeManager typeManager, HdfsEnvironment 
hdfsEnvironment,
+      NodeVersion nodeVersion,
+      FileFormatDataSourceStats stats)
+  {

Review comment:
       please make format better for line 52 ~ 55

##########
File path: 
integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataPageSinkProvider.java
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import java.util.List;
+import java.util.Map;
+import java.util.OptionalInt;
+import java.util.Set;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.inject.Inject;
+import io.airlift.event.client.EventClient;
+import io.airlift.json.JsonCodec;
+import io.airlift.units.DataSize;
+import io.prestosql.plugin.hive.HdfsEnvironment;
+import io.prestosql.plugin.hive.HiveConfig;
+import io.prestosql.plugin.hive.HiveFileWriterFactory;
+import io.prestosql.plugin.hive.HivePageSink;
+import io.prestosql.plugin.hive.HivePageSinkProvider;
+import io.prestosql.plugin.hive.HiveSessionProperties;
+import io.prestosql.plugin.hive.HiveWritableTableHandle;
+import io.prestosql.plugin.hive.HiveWriterStats;
+import io.prestosql.plugin.hive.LocationService;
+import io.prestosql.plugin.hive.OrcFileWriterFactory;
+import io.prestosql.plugin.hive.PartitionUpdate;
+import io.prestosql.plugin.hive.metastore.HiveMetastore;
+import io.prestosql.plugin.hive.metastore.HivePageSinkMetadataProvider;
+import io.prestosql.plugin.hive.metastore.SortingColumn;
+import io.prestosql.spi.NodeManager;
+import io.prestosql.spi.PageIndexerFactory;
+import io.prestosql.spi.PageSorter;
+import io.prestosql.spi.connector.ConnectorInsertTableHandle;
+import io.prestosql.spi.connector.ConnectorPageSink;
+import io.prestosql.spi.connector.ConnectorSession;
+import io.prestosql.spi.connector.ConnectorTransactionHandle;
+import io.prestosql.spi.type.TypeManager;
+
+import static 
com.google.common.util.concurrent.MoreExecutors.listeningDecorator;
+import static io.airlift.concurrent.Threads.daemonThreadsNamed;
+import static 
io.prestosql.plugin.hive.metastore.CachingHiveMetastore.memoizeMetastore;
+import static java.util.Objects.requireNonNull;
+import static java.util.concurrent.Executors.newFixedThreadPool;
+
+public class CarbonDataPageSinkProvider extends HivePageSinkProvider {
+
+  private final Set<HiveFileWriterFactory> fileWriterFactories;
+  private final HdfsEnvironment hdfsEnvironment;
+  private final PageSorter pageSorter;
+  private final HiveMetastore metastore;
+  private final PageIndexerFactory pageIndexerFactory;
+  private final TypeManager typeManager;
+  private final int maxOpenPartitions;
+  private final int maxOpenSortFiles;
+  private final DataSize writerSortBufferSize;
+  private final boolean immutablePartitions;
+  private final LocationService locationService;
+  private final ListeningExecutorService writeVerificationExecutor;
+  private final JsonCodec<PartitionUpdate> partitionUpdateCodec;
+  private final NodeManager nodeManager;
+  private final EventClient eventClient;
+  private final HiveSessionProperties hiveSessionProperties;
+  private final HiveWriterStats hiveWriterStats;
+  private final OrcFileWriterFactory orcFileWriterFactory;
+  private final long perTransactionMetastoreCacheMaximumSize;
+
+  @Inject
+  public CarbonDataPageSinkProvider(Set<HiveFileWriterFactory> 
fileWriterFactories,
+      HdfsEnvironment hdfsEnvironment, PageSorter pageSorter, HiveMetastore 
metastore,

Review comment:
       Too many parameters, should creata a parameter object

##########
File path: 
integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataFileWriter.java
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat;
+import org.apache.carbondata.hive.CarbonHiveSerDe;
+import org.apache.carbondata.hive.MapredCarbonOutputFormat;
+import org.apache.carbondata.presto.impl.CarbonTableConfig;
+
+import com.google.common.collect.ImmutableList;
+import io.prestosql.plugin.hive.HiveFileWriter;
+import io.prestosql.plugin.hive.HiveType;
+import io.prestosql.plugin.hive.HiveWriteUtils;
+import io.prestosql.spi.Page;
+import io.prestosql.spi.PrestoException;
+import io.prestosql.spi.block.Block;
+import io.prestosql.spi.type.Type;
+import io.prestosql.spi.type.TypeManager;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.hive.ql.io.IOConstants;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import 
org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.log4j.Logger;
+
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_WRITER_DATA_ERROR;
+import static java.util.Objects.requireNonNull;
+import static java.util.stream.Collectors.toList;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.COMPRESSRESULT;
+
+/**
+ * This class implements HiveFileWriter and it creates the carbonFileWriter to 
write the age data
+ * sent from presto.
+ */
+public class CarbonDataFileWriter implements HiveFileWriter {
+
+  private static final Logger LOG =
+      LogServiceFactory.getLogService(CarbonDataFileWriter.class.getName());
+
+  private final JobConf configuration;
+  private Path outPutPath;
+  private final FileSinkOperator.RecordWriter recordWriter;
+  private final CarbonHiveSerDe serDe;
+  private final int fieldCount;
+  private final Object row;
+  private final SettableStructObjectInspector tableInspector;
+  private final List<StructField> structFields;
+  private final HiveWriteUtils.FieldSetter[] setters;
+
+  private boolean isCommitDone;
+
+  public CarbonDataFileWriter(Path outPutPath, List<String> inputColumnNames, 
Properties properties,
+      JobConf configuration, TypeManager typeManager) throws SerDeException {
+    this.outPutPath = requireNonNull(outPutPath, "path is null");
+    this.outPutPath = new Path(properties.getProperty("location"));
+    outPutPath = new Path(properties.getProperty("location"));
+    this.configuration = requireNonNull(configuration, "conf is null");
+    List<String> columnNames = Arrays
+        .asList(properties.getProperty(IOConstants.COLUMNS, 
"").split(CarbonCommonConstants.COMMA));
+    List<Type> fileColumnTypes =
+        HiveType.toHiveTypes(properties.getProperty(IOConstants.COLUMNS_TYPES, 
"")).stream()
+            .map(hiveType -> hiveType.getType(typeManager)).collect(toList());
+    fieldCount = columnNames.size();
+    serDe = new CarbonHiveSerDe();
+    serDe.initialize(configuration, properties);
+    tableInspector = (ArrayWritableObjectInspector) serDe.getObjectInspector();
+
+    structFields = ImmutableList.copyOf(
+        inputColumnNames.stream().map(tableInspector::getStructFieldRef)
+            .collect(toImmutableList()));
+
+    row = tableInspector.create();
+
+    setters = new HiveWriteUtils.FieldSetter[structFields.size()];
+    for (int i = 0; i < setters.length; i++) {
+      setters[i] = HiveWriteUtils.createFieldSetter(tableInspector, row, 
structFields.get(i),
+          fileColumnTypes.get(structFields.get(i).getFieldID()));
+    }
+    String encodedLoadModel = 
configuration.get(CarbonTableConfig.CARBON_PRESTO_LOAD_MODEL);
+    if (StringUtils.isNotEmpty(encodedLoadModel)) {
+      configuration.set(CarbonTableOutputFormat.LOAD_MODEL, encodedLoadModel);
+    }
+    try {
+      boolean compress = HiveConf.getBoolVar(configuration, COMPRESSRESULT);
+      Object writer =
+          
Class.forName(MapredCarbonOutputFormat.class.getName()).getConstructor().newInstance();
+      recordWriter = ((MapredCarbonOutputFormat<?>) writer)
+          .getHiveRecordWriter(this.configuration, outPutPath, Text.class, 
compress,
+              properties, Reporter.NULL);
+    } catch (Exception e) {
+      LOG.error("error while initializing writer", e);
+      throw new RuntimeException("writer class not found");
+    }
+  }
+
+  @Override public long getWrittenBytes() {
+    if (isCommitDone) {
+      try {
+        return 
outPutPath.getFileSystem(configuration).getFileStatus(outPutPath).getLen();
+      } catch (IOException e) {
+        throw new UncheckedIOException(e);
+      }
+    }
+    return 0;
+  }
+
+  @Override public long getSystemMemoryUsage() {
+    return 0;
+  }
+
+  @Override public void appendRows(Page dataPage) {
+    for (int position = 0; position < dataPage.getPositionCount(); position++) 
{
+      appendRow(dataPage, position);
+    }
+  }
+
+  public void appendRow(Page dataPage, int position) {

Review comment:
       why is it public?

##########
File path: 
integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoInsertIntoTableTestCase.scala
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto.integrationtest
+
+import java.io.File
+import java.util
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuiteLike}
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, 
CarbonFileFilter}
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.metadata.schema.SchemaReader
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, 
CarbonTableIdentifier}
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.presto.server.PrestoServer
+import org.apache.carbondata.presto.util.CarbonDataStoreCreator
+
+class PrestoInsertIntoTableTestCase extends FunSuiteLike with 
BeforeAndAfterAll with BeforeAndAfterEach {
+
+  private val logger = LogServiceFactory
+    .getLogService(classOf[PrestoAllDataTypeTest].getCanonicalName)
+
+  private val rootPath = new File(this.getClass.getResource("/").getPath
+                                  + "../../../..").getCanonicalPath
+  private val storePath = s"$rootPath/integration/presto/target/store"
+  private val systemPath = s"$rootPath/integration/presto/target/system"
+  private val prestoServer = new PrestoServer
+
+  override def beforeAll: Unit = {
+    
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME,
+      "Presto")
+    val map = new util.HashMap[String, String]()
+    map.put("hive.metastore", "file")
+    map.put("hive.metastore.catalog.dir", s"file://$storePath")
+    map.put("hive.allow-drop-table", "true")
+    prestoServer.startServer("testdb", map)
+    prestoServer.execute("drop schema if exists testdb")
+    prestoServer.execute("create schema testdb")
+  }
+
+  override protected def beforeEach(): Unit = {
+    val query = "create table testdb.testtable(ID int, date date, country 
varchar, name varchar, phonetype varchar, serialname varchar,salary 
decimal(6,1), bonus decimal(8,6), monthlyBonus decimal(5,3), dob timestamp, 
shortField smallint, iscurrentemployee boolean) with(format='CARBONDATA') "
+    createTable(query, "testdb", "testtable")
+  }
+
+  private def createTable(query: String, databaseName: String, tableName: 
String): Unit = {
+    prestoServer.execute(s"drop table if exists ${databaseName}.${tableName}")
+    prestoServer.execute(query)
+    logger.info("Creating The Carbon Store")
+    val absoluteTableIdentifier: AbsoluteTableIdentifier = 
getAbsoluteIdentifier(databaseName, tableName)
+    CarbonDataStoreCreator.createTable(absoluteTableIdentifier, true)
+    logger.info(s"\nCarbon store is created at location: $storePath")
+  }
+
+  private def getAbsoluteIdentifier(dbName: String,
+      tableName: String) = {
+    val absoluteTableIdentifier = AbsoluteTableIdentifier.from(
+      storePath + "/" + dbName + "/" + tableName,
+      new CarbonTableIdentifier(dbName,
+        tableName,
+        UUID.randomUUID().toString))
+    absoluteTableIdentifier
+  }
+
+  test("test insert with different storage format names") {
+    val query1 = "create table testdb.testtable(ID int, date date, country 
varchar, name varchar, phonetype varchar, serialname varchar,salary 
decimal(6,1), bonus decimal(8,6), monthlyBonus decimal(5,3), dob timestamp, 
shortField smallint, iscurrentemployee boolean) with(format='CARBONDATA') "
+    val query2 = "create table testdb.testtable(ID int, date date, country 
varchar, name varchar, phonetype varchar, serialname varchar,salary 
decimal(6,1), bonus decimal(8,6), monthlyBonus decimal(5,3), dob timestamp, 
shortField smallint, iscurrentemployee boolean) with(format='CARBON') "
+    val query3 = "create table testdb.testtable(ID int, date date, country 
varchar, name varchar, phonetype varchar, serialname varchar,salary 
decimal(6,1), bonus decimal(8,6), monthlyBonus decimal(5,3), dob timestamp, 
shortField smallint, iscurrentemployee boolean) 
with(format='ORG.APACHE.CARBONDATA.FORMAT') "
+    createTable(query1, "testdb", "testtable")
+    createTable(query2, "testdb", "testtable")
+    createTable(query3, "testdb", "testtable")
+    prestoServer.execute("insert into testdb.testtable values(10, 
current_date, 'INDIA', 'Chandler', 'qwerty', 
'usn20392',10000.0,16.234567,25.678,timestamp '1994-06-14 05:00:09',smallint 
'23', true)")
+    val absoluteTableIdentifier: AbsoluteTableIdentifier = 
getAbsoluteIdentifier("testdb", "testtable")
+    val carbonTable = 
SchemaReader.readCarbonTableFromStore(absoluteTableIdentifier)
+    val segmentPath = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, 
"0")
+    assert(FileFactory.getCarbonFile(segmentPath).isFileExist)
+  }
+
+  test("test insert into one segment and check folder structure") {
+    prestoServer.execute("insert into testdb.testtable values(10, 
current_date, 'INDIA', 'Chandler', 'qwerty', 
'usn20392',10000.0,16.234567,25.678,timestamp '1994-06-14 05:00:09',smallint 
'23', true)")
+    prestoServer.execute("insert into testdb.testtable values(10, 
current_date, 'INDIA', 'Chandler', 'qwerty', 
'usn20392',10000.0,16.234567,25.678,timestamp '1994-06-14 05:00:09',smallint 
'23', true)")
+    val absoluteTableIdentifier: AbsoluteTableIdentifier = 
getAbsoluteIdentifier("testdb", "testtable")
+    val carbonTable = 
SchemaReader.readCarbonTableFromStore(absoluteTableIdentifier)
+    val tablePath = carbonTable.getTablePath
+    val segment0Path = CarbonTablePath.getSegmentPath(tablePath, "0")
+    val segment1Path = CarbonTablePath.getSegmentPath(tablePath, "1")
+    val segment0 = FileFactory.getCarbonFile(segment0Path)
+    assert(segment0.isFileExist)
+    assert(segment0.listFiles(new CarbonFileFilter {
+      override def accept(file: CarbonFile): Boolean = {
+        file.getName.endsWith(CarbonTablePath.CARBON_DATA_EXT) ||
+        file.getName.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)
+      }
+    }).length == 2)
+    val segment1 = FileFactory.getCarbonFile(segment1Path)
+    assert(segment1.isFileExist)
+    assert(segment1.listFiles(new CarbonFileFilter {
+      override def accept(file: CarbonFile): Boolean = {
+        file.getName.endsWith(CarbonTablePath.CARBON_DATA_EXT) ||
+        file.getName.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)
+      }
+    }).length == 2)
+    val segmentsPath = CarbonTablePath.getSegmentFilesLocation(tablePath)
+    assert(FileFactory.getCarbonFile(segmentsPath).isFileExist && 
FileFactory.getCarbonFile(segmentsPath).listFiles(true).size() == 2)
+    val metadataFolderPath = CarbonTablePath.getMetadataPath(tablePath)
+    FileFactory.getCarbonFile(metadataFolderPath).listFiles(new 
CarbonFileFilter {
+      override def accept(file: CarbonFile): Boolean = {
+        file.getName.endsWith(CarbonTablePath.TABLE_STATUS_FILE)
+      }
+    })
+  }
+
+  test("test insert into many segments and check segment count and data 
count") {
+    prestoServer.execute("insert into testdb.testtable values(10, 
current_date, 'INDIA', 'Chandler', 'qwerty', 
'usn20392',10000.0,16.234567,25.678,timestamp '1994-06-14 05:00:09',smallint 
'23', true)")
+    prestoServer.execute("insert into testdb.testtable values(10, 
current_date, 'INDIA', 'Chandler', 'qwerty', 
'usn20392',10000.0,16.234567,25.678,timestamp '1998-12-16 10:12:09',smallint 
'23', true)")
+    prestoServer.execute("insert into testdb.testtable values(10, 
current_date, 'INDIA', 'Chandler', 'qwerty', 
'usn20392',10000.0,16.234567,25.678,timestamp '1994-06-14 05:00:09',smallint 
'23', true)")
+    prestoServer.execute("insert into testdb.testtable values(10, 
current_date, 'INDIA', 'Chandler', 'qwerty', 
'usn20392',10000.0,16.234567,25.678,timestamp '1998-12-16 10:12:09',smallint 
'23', true)")
+    val absoluteTableIdentifier: AbsoluteTableIdentifier = 
getAbsoluteIdentifier("testdb", "testtable")
+    val carbonTable = 
SchemaReader.readCarbonTableFromStore(absoluteTableIdentifier)
+    val segmentFoldersLocation = 
CarbonTablePath.getPartitionDir(carbonTable.getTablePath)
+    
assert(FileFactory.getCarbonFile(segmentFoldersLocation).listFiles(false).size()
 == 8)
+    val actualResult1: List[Map[String, Any]] = prestoServer
+      .executeQuery("select count(*) AS RESULT from testdb.testtable")
+    val expectedResult1: List[Map[String, Any]] = List(Map("RESULT" -> 4))
+    assert(actualResult1.equals(expectedResult1))
+    // filter query
+    val actualResult2: List[Map[String, Any]] = prestoServer
+      .executeQuery("select count(*) AS RESULT from testdb.testtable WHERE dob 
= timestamp '1998-12-16 10:12:09'")
+    val expectedResult2: List[Map[String, Any]] = List(Map("RESULT" -> 2))
+    assert(actualResult2.equals(expectedResult2))
+  }
+
+  test("test if the table status contains the segment file name for each 
load") {
+    prestoServer.execute("insert into testdb.testtable values(10, 
current_date, 'INDIA', 'Chandler', 'qwerty', 
'usn20392',10000.0,16.234567,25.678,timestamp '1994-06-14 05:00:09',smallint 
'23', true)")
+    val absoluteTableIdentifier: AbsoluteTableIdentifier = 
getAbsoluteIdentifier("testdb", "testtable")
+    val carbonTable = 
SchemaReader.readCarbonTableFromStore(absoluteTableIdentifier)
+    val ssm = new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier)
+    ssm.getValidAndInvalidSegments.getValidSegments.asScala.foreach { segment 
=>
+      val loadMetadataDetails = segment.getLoadMetadataDetails
+      assert(loadMetadataDetails.getSegmentFile != null)
+    }
+  }
+

Review comment:
       To test transactional write, you can add a testcase to query the table 
while inserting data. In that case, query result should not change before 
insertion success

##########
File path: 
integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataFileWriter.java
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat;
+import org.apache.carbondata.hive.CarbonHiveSerDe;
+import org.apache.carbondata.hive.MapredCarbonOutputFormat;
+import org.apache.carbondata.presto.impl.CarbonTableConfig;
+
+import com.google.common.collect.ImmutableList;
+import io.prestosql.plugin.hive.HiveFileWriter;
+import io.prestosql.plugin.hive.HiveType;
+import io.prestosql.plugin.hive.HiveWriteUtils;
+import io.prestosql.spi.Page;
+import io.prestosql.spi.PrestoException;
+import io.prestosql.spi.block.Block;
+import io.prestosql.spi.type.Type;
+import io.prestosql.spi.type.TypeManager;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.hive.ql.io.IOConstants;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import 
org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.log4j.Logger;
+
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_WRITER_DATA_ERROR;
+import static java.util.Objects.requireNonNull;
+import static java.util.stream.Collectors.toList;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.COMPRESSRESULT;
+
+/**
+ * This class implements HiveFileWriter and it creates the carbonFileWriter to 
write the age data
+ * sent from presto.
+ */
+public class CarbonDataFileWriter implements HiveFileWriter {
+
+  private static final Logger LOG =
+      LogServiceFactory.getLogService(CarbonDataFileWriter.class.getName());
+
+  private final JobConf configuration;
+  private Path outPutPath;
+  private final FileSinkOperator.RecordWriter recordWriter;
+  private final CarbonHiveSerDe serDe;
+  private final int fieldCount;
+  private final Object row;
+  private final SettableStructObjectInspector tableInspector;
+  private final List<StructField> structFields;
+  private final HiveWriteUtils.FieldSetter[] setters;
+
+  private boolean isCommitDone;
+
+  public CarbonDataFileWriter(Path outPutPath, List<String> inputColumnNames, 
Properties properties,
+      JobConf configuration, TypeManager typeManager) throws SerDeException {
+    this.outPutPath = requireNonNull(outPutPath, "path is null");
+    this.outPutPath = new Path(properties.getProperty("location"));
+    outPutPath = new Path(properties.getProperty("location"));
+    this.configuration = requireNonNull(configuration, "conf is null");
+    List<String> columnNames = Arrays
+        .asList(properties.getProperty(IOConstants.COLUMNS, 
"").split(CarbonCommonConstants.COMMA));
+    List<Type> fileColumnTypes =
+        HiveType.toHiveTypes(properties.getProperty(IOConstants.COLUMNS_TYPES, 
"")).stream()
+            .map(hiveType -> hiveType.getType(typeManager)).collect(toList());
+    fieldCount = columnNames.size();
+    serDe = new CarbonHiveSerDe();
+    serDe.initialize(configuration, properties);
+    tableInspector = (ArrayWritableObjectInspector) serDe.getObjectInspector();
+
+    structFields = ImmutableList.copyOf(
+        inputColumnNames.stream().map(tableInspector::getStructFieldRef)
+            .collect(toImmutableList()));
+
+    row = tableInspector.create();
+
+    setters = new HiveWriteUtils.FieldSetter[structFields.size()];
+    for (int i = 0; i < setters.length; i++) {
+      setters[i] = HiveWriteUtils.createFieldSetter(tableInspector, row, 
structFields.get(i),
+          fileColumnTypes.get(structFields.get(i).getFieldID()));
+    }
+    String encodedLoadModel = 
configuration.get(CarbonTableConfig.CARBON_PRESTO_LOAD_MODEL);
+    if (StringUtils.isNotEmpty(encodedLoadModel)) {
+      configuration.set(CarbonTableOutputFormat.LOAD_MODEL, encodedLoadModel);
+    }
+    try {
+      boolean compress = HiveConf.getBoolVar(configuration, COMPRESSRESULT);
+      Object writer =
+          
Class.forName(MapredCarbonOutputFormat.class.getName()).getConstructor().newInstance();
+      recordWriter = ((MapredCarbonOutputFormat<?>) writer)
+          .getHiveRecordWriter(this.configuration, outPutPath, Text.class, 
compress,
+              properties, Reporter.NULL);
+    } catch (Exception e) {
+      LOG.error("error while initializing writer", e);
+      throw new RuntimeException("writer class not found");
+    }
+  }
+
+  @Override public long getWrittenBytes() {
+    if (isCommitDone) {
+      try {
+        return 
outPutPath.getFileSystem(configuration).getFileStatus(outPutPath).getLen();
+      } catch (IOException e) {
+        throw new UncheckedIOException(e);
+      }
+    }
+    return 0;
+  }
+
+  @Override public long getSystemMemoryUsage() {
+    return 0;
+  }
+
+  @Override public void appendRows(Page dataPage) {
+    for (int position = 0; position < dataPage.getPositionCount(); position++) 
{
+      appendRow(dataPage, position);
+    }
+  }
+
+  public void appendRow(Page dataPage, int position) {
+    for (int field = 0; field < fieldCount; field++) {
+      Block block = dataPage.getBlock(field);
+      if (block.isNull(position)) {
+        tableInspector.setStructFieldData(row, structFields.get(field), null);
+      } else {
+        setters[field].setField(block, position);
+      }
+    }
+
+    try {
+      recordWriter.write(serDe.serialize(row, tableInspector));
+    } catch (SerDeException | IOException e) {
+      throw new PrestoException(HIVE_WRITER_DATA_ERROR, e);
+    }
+  }
+
+  @Override public void commit() {
+    try {
+      recordWriter.close(false);
+    } catch (Exception ex) {
+      LOG.error("Error while closing the record writer", ex);
+      throw new RuntimeException(ex);
+    }
+    isCommitDone = true;
+  }
+
+  @Override public void rollback() {
+

Review comment:
       should throw UnsupportedOperationException here

##########
File path: 
integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataMetaData.java
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Optional;
+import java.util.Properties;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.ThreadLocalSessionInfo;
+import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat;
+import org.apache.carbondata.hive.MapredCarbonOutputCommitter;
+import org.apache.carbondata.hive.util.HiveCarbonUtil;
+import org.apache.carbondata.presto.impl.CarbonTableConfig;
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
+
+import com.google.common.collect.ImmutableMap;
+import io.airlift.slice.Slice;
+import io.prestosql.plugin.hive.HdfsEnvironment;
+import io.prestosql.plugin.hive.HiveInsertTableHandle;
+import io.prestosql.plugin.hive.HiveMetadata;
+import io.prestosql.plugin.hive.HivePartitionManager;
+import io.prestosql.plugin.hive.LocationService;
+import io.prestosql.plugin.hive.PartitionUpdate;
+import io.prestosql.plugin.hive.TypeTranslator;
+import io.prestosql.plugin.hive.metastore.MetastoreUtil;
+import io.prestosql.plugin.hive.metastore.SemiTransactionalHiveMetastore;
+import io.prestosql.plugin.hive.metastore.Table;
+import io.prestosql.plugin.hive.security.AccessControlMetadata;
+import io.prestosql.plugin.hive.statistics.HiveStatisticsProvider;
+import io.prestosql.plugin.hive.util.ConfigurationUtils;
+import io.prestosql.spi.connector.ConnectorInsertTableHandle;
+import io.prestosql.spi.connector.ConnectorOutputMetadata;
+import io.prestosql.spi.connector.ConnectorSession;
+import io.prestosql.spi.connector.ConnectorTableHandle;
+import io.prestosql.spi.connector.SchemaTableName;
+import io.prestosql.spi.statistics.ComputedStatistics;
+import io.prestosql.spi.type.TypeManager;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContextImpl;
+import org.apache.hadoop.mapred.JobID;
+
+import org.apache.hadoop.mapred.TaskAttemptContextImpl;
+import org.apache.log4j.Logger;
+import org.joda.time.DateTimeZone;
+
+public class CarbonDataMetaData extends HiveMetadata {
+
+  private static final Logger LOG =
+      LogServiceFactory.getLogService(CarbonDataMetaData.class.getName());
+
+  private HdfsEnvironment hdfsEnvironment;
+  private SemiTransactionalHiveMetastore metastore;
+  private MapredCarbonOutputCommitter carbonOutputCommitter;
+  private JobContextImpl jobContext;
+
+  public CarbonDataMetaData(SemiTransactionalHiveMetastore metastore,
+      HdfsEnvironment hdfsEnvironment, HivePartitionManager partitionManager, 
DateTimeZone timeZone,
+      boolean allowCorruptWritesForTesting, boolean 
writesToNonManagedTablesEnabled,
+      boolean createsOfNonManagedTablesEnabled, TypeManager typeManager,
+      LocationService locationService,
+      io.airlift.json.JsonCodec<PartitionUpdate> partitionUpdateCodec,
+      TypeTranslator typeTranslator, String prestoVersion,
+      HiveStatisticsProvider hiveStatisticsProvider, AccessControlMetadata 
accessControlMetadata) {
+    super(metastore, hdfsEnvironment, partitionManager, timeZone, 
allowCorruptWritesForTesting,
+        true, createsOfNonManagedTablesEnabled, typeManager,
+        locationService, partitionUpdateCodec, typeTranslator, prestoVersion,
+        hiveStatisticsProvider, accessControlMetadata);
+    this.hdfsEnvironment = hdfsEnvironment;
+    this.metastore = metastore;
+  }
+
+  @Override
+  public CarbonDataInsertTableHandle beginInsert(ConnectorSession session,
+      ConnectorTableHandle tableHandle) {
+    HiveInsertTableHandle hiveInsertTableHandle = super.beginInsert(session, 
tableHandle);
+    SchemaTableName tableName = hiveInsertTableHandle.getSchemaTableName();
+    Optional<Table> table =
+        this.metastore.getTable(tableName.getSchemaName(), 
tableName.getTableName());
+    Path outputPath =
+        new 
Path(hiveInsertTableHandle.getLocationHandle().getJsonSerializableTargetPath());
+    JobConf jobConf = ConfigurationUtils.toJobConf(this.hdfsEnvironment
+        .getConfiguration(
+            new HdfsEnvironment.HdfsContext(session, 
hiveInsertTableHandle.getSchemaName(),
+                hiveInsertTableHandle.getTableName()),
+            new 
Path(hiveInsertTableHandle.getLocationHandle().getJsonSerializableWritePath())));
+    jobConf.set("location", outputPath.toString());
+    Properties hiveSchema = MetastoreUtil.getHiveSchema(table.get());
+    try {
+      CarbonLoadModel carbonLoadModel =
+          HiveCarbonUtil.getCarbonLoadModel(hiveSchema, jobConf);
+
+      CarbonTableOutputFormat.setLoadModel(jobConf, carbonLoadModel);
+    } catch (IOException ex) {
+      LOG.error("Error while creating carbon load model", ex);
+      throw new RuntimeException(ex);
+    }
+    try {
+      carbonOutputCommitter = new MapredCarbonOutputCommitter();
+      jobContext = new JobContextImpl(jobConf, new JobID());
+      carbonOutputCommitter.setupJob(jobContext);
+      ThreadLocalSessionInfo.setConfigurationToCurrentThread(jobConf);
+    } catch (IOException e) {
+      LOG.error("error setting the output committer", e);
+      throw new RuntimeException("error setting the output committer");
+    }
+    return new 
CarbonDataInsertTableHandle(hiveInsertTableHandle.getSchemaTableName().getSchemaName(),
+        hiveInsertTableHandle.getTableName(),
+        hiveInsertTableHandle.getInputColumns(),
+        hiveInsertTableHandle.getPageSinkMetadata(),
+        hiveInsertTableHandle.getLocationHandle(),
+        hiveInsertTableHandle.getBucketProperty(), 
hiveInsertTableHandle.getTableStorageFormat(),
+        hiveInsertTableHandle.getPartitionStorageFormat(),
+        ImmutableMap.of(CarbonTableConfig.CARBON_PRESTO_LOAD_MODEL,
+            
jobContext.getConfiguration().get(CarbonTableOutputFormat.LOAD_MODEL)));
+  }
+
+  @Override
+  public Optional<ConnectorOutputMetadata> finishInsert(ConnectorSession 
session,
+      ConnectorInsertTableHandle insertHandle, Collection<Slice> fragments,
+      Collection<ComputedStatistics> computedStatistics) {
+    Optional<ConnectorOutputMetadata> connectorOutputMetadata =
+        super.finishInsert(session, insertHandle, fragments, 
computedStatistics);
+    try {
+      carbonOutputCommitter.commitJob(jobContext);
+    } catch (IOException e) {
+      LOG.error("Error occurred while committing the insert job.", e);
+      throw new RuntimeException(e);

Review comment:
       If exception is thrown here, we have already called super.finishInsert. 
So is it better to call super.finishInsert after commitJob?

##########
File path: 
integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataFileWriter.java
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat;
+import org.apache.carbondata.hive.CarbonHiveSerDe;
+import org.apache.carbondata.hive.MapredCarbonOutputFormat;
+import org.apache.carbondata.presto.impl.CarbonTableConfig;
+
+import com.google.common.collect.ImmutableList;
+import io.prestosql.plugin.hive.HiveFileWriter;
+import io.prestosql.plugin.hive.HiveType;
+import io.prestosql.plugin.hive.HiveWriteUtils;
+import io.prestosql.spi.Page;
+import io.prestosql.spi.PrestoException;
+import io.prestosql.spi.block.Block;
+import io.prestosql.spi.type.Type;
+import io.prestosql.spi.type.TypeManager;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.hive.ql.io.IOConstants;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import 
org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.log4j.Logger;
+
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_WRITER_DATA_ERROR;
+import static java.util.Objects.requireNonNull;
+import static java.util.stream.Collectors.toList;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.COMPRESSRESULT;
+
+/**
+ * This class implements HiveFileWriter and it creates the carbonFileWriter to 
write the age data
+ * sent from presto.
+ */
+public class CarbonDataFileWriter implements HiveFileWriter {
+
+  private static final Logger LOG =
+      LogServiceFactory.getLogService(CarbonDataFileWriter.class.getName());
+
+  private final JobConf configuration;
+  private Path outPutPath;
+  private final FileSinkOperator.RecordWriter recordWriter;
+  private final CarbonHiveSerDe serDe;
+  private final int fieldCount;
+  private final Object row;
+  private final SettableStructObjectInspector tableInspector;
+  private final List<StructField> structFields;
+  private final HiveWriteUtils.FieldSetter[] setters;
+
+  private boolean isCommitDone;
+
+  public CarbonDataFileWriter(Path outPutPath, List<String> inputColumnNames, 
Properties properties,
+      JobConf configuration, TypeManager typeManager) throws SerDeException {
+    this.outPutPath = requireNonNull(outPutPath, "path is null");
+    this.outPutPath = new Path(properties.getProperty("location"));
+    outPutPath = new Path(properties.getProperty("location"));
+    this.configuration = requireNonNull(configuration, "conf is null");
+    List<String> columnNames = Arrays
+        .asList(properties.getProperty(IOConstants.COLUMNS, 
"").split(CarbonCommonConstants.COMMA));
+    List<Type> fileColumnTypes =
+        HiveType.toHiveTypes(properties.getProperty(IOConstants.COLUMNS_TYPES, 
"")).stream()
+            .map(hiveType -> hiveType.getType(typeManager)).collect(toList());
+    fieldCount = columnNames.size();
+    serDe = new CarbonHiveSerDe();
+    serDe.initialize(configuration, properties);
+    tableInspector = (ArrayWritableObjectInspector) serDe.getObjectInspector();
+
+    structFields = ImmutableList.copyOf(

Review comment:
       move `ImmutableList.copyOf(` to next and make format better

##########
File path: 
integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbonDataInsertTableHandle.java
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.presto;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.ImmutableMap;
+import io.prestosql.plugin.hive.HiveBucketProperty;
+import io.prestosql.plugin.hive.HiveColumnHandle;
+import io.prestosql.plugin.hive.HiveInsertTableHandle;
+import io.prestosql.plugin.hive.HiveStorageFormat;
+import io.prestosql.plugin.hive.LocationHandle;
+import io.prestosql.plugin.hive.metastore.HivePageSinkMetadata;
+import io.prestosql.spi.connector.ConnectorInsertTableHandle;
+
+import static java.util.Objects.requireNonNull;
+
+public class CarbonDataInsertTableHandle extends HiveInsertTableHandle 
implements
+    ConnectorInsertTableHandle {
+
+  private final Map<String, String> additionalConf;
+
+  @JsonCreator public CarbonDataInsertTableHandle(@JsonProperty("schemaName") 
String schemaName,

Review comment:
       move first parameter to next line

##########
File path: 
integration/presto/src/main/prestosql/org/apache/carbondata/presto/CarbondataConnectorFactory.java
##########
@@ -214,4 +226,8 @@ private static void setCarbonEnum() throws Exception {
     hiveStorageFormats[src.length] = (HiveStorageFormat) instance;
     values.set(null, hiveStorageFormats);
   }
-}
\ No newline at end of file
+
+  @Override public ConnectorHandleResolver getHandleResolver() {

Review comment:
       move Override to previous line




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to