This is an automated email from the ASF dual-hosted git repository.

kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 5e10018  [CARBONDATA-3865] Implementation of delete/update feature in 
carbondata SDK.
5e10018 is described below

commit 5e100188c4b3c4ea3e78be186a6cae7358a4ac46
Author: Karan-c980 <[email protected]>
AuthorDate: Wed Apr 15 00:37:42 2020 +0530

    [CARBONDATA-3865] Implementation of delete/update feature in carbondata SDK.
    
    Why is this PR needed?
    Currently carbondata SDK doesn't provide delete/update feature.
    This PR will supports carbondata SDK to delete/update of records from 
carbondata files
    
    What changes were proposed in this PR?
    With the help of this PR carbondata SDK will support delete/update features.
    
    This closes #3834
---
 .../scan/result/vector/CarbonColumnarBatch.java    |   4 +
 .../hadoop/api/CarbonFileInputFormat.java          |  77 ++++
 .../hadoop/api/CarbonTableOutputFormat.java        |  55 +++
 .../hadoop/util/CarbonVectorizedRecordReader.java  |  15 +-
 .../org/apache/carbondata/sdk/file/CarbonIUD.java  | 394 ++++++++++++++++
 .../apache/carbondata/sdk/file/CarbonIUDTest.java  | 504 +++++++++++++++++++++
 6 files changed, 1046 insertions(+), 3 deletions(-)

diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnarBatch.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnarBatch.java
index 634ca7a..1019c35 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnarBatch.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnarBatch.java
@@ -40,6 +40,10 @@ public class CarbonColumnarBatch {
     this.filteredRows = filteredRows;
   }
 
+  public boolean[] getFilteredRows() {
+    return filteredRows;
+  }
+
   public int getBatchSize() {
     return batchSize;
   }
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
index 9c599ef..7922244 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
@@ -17,15 +17,23 @@
 
 package org.apache.carbondata.hadoop.api;
 
+import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.common.annotations.InterfaceStability;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.index.IndexFilter;
@@ -43,6 +51,8 @@ import 
org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.hadoop.CarbonInputSplit;
 
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.ArrayUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.JobContext;
@@ -172,6 +182,7 @@ public class CarbonFileInputFormat<T> extends 
CarbonInputFormat<T> implements Se
         carbonFiles = getAllCarbonDataFiles(carbonTable.getTablePath());
       }
 
+      List<String> allDeleteDeltaFiles = 
getAllDeleteDeltaFiles(carbonTable.getTablePath());
       for (CarbonFile carbonFile : carbonFiles) {
         // Segment id is set to null because SDK does not write carbondata 
files with respect
         // to segments. So no specific name is present for this load.
@@ -184,10 +195,15 @@ public class CarbonFileInputFormat<T> extends 
CarbonInputFormat<T> implements Se
         info.setBlockSize(carbonFile.getLength());
         info.setVersionNumber(split.getVersion().number());
         info.setUseMinMaxForPruning(false);
+        if (CollectionUtils.isNotEmpty(allDeleteDeltaFiles)) {
+          split.setDeleteDeltaFiles(
+              getDeleteDeltaFiles(carbonFile.getAbsolutePath(), 
allDeleteDeltaFiles));
+        }
         splits.add(split);
       }
       splits.sort(Comparator.comparing(o -> ((CarbonInputSplit) 
o).getFilePath()));
     }
+
     setAllColumnProjectionIfNotConfigured(job, carbonTable);
     return splits;
   }
@@ -239,6 +255,67 @@ public class CarbonFileInputFormat<T> extends 
CarbonInputFormat<T> implements Se
     List<CarbonInputSplit> dataBlocksOfSegment = getDataBlocksOfSegment(job, 
carbonTable,
         indexFilter, validSegments, new ArrayList<>(), new ArrayList<>());
     numBlocks = dataBlocksOfSegment.size();
+    List<String> allDeleteDeltaFiles = 
getAllDeleteDeltaFiles(carbonTable.getTablePath());
+    if (CollectionUtils.isNotEmpty(allDeleteDeltaFiles)) {
+      for (CarbonInputSplit split : dataBlocksOfSegment) {
+        split.setDeleteDeltaFiles(getDeleteDeltaFiles(split.getFilePath(), 
allDeleteDeltaFiles));
+      }
+    }
     return new LinkedList<>(dataBlocksOfSegment);
   }
+
+  private List<String> getAllDeleteDeltaFiles(String path) {
+    List<String> deltaFiles = null;
+    try (Stream<Path> walk = Files.walk(Paths.get(path))) {
+      deltaFiles = walk.map(x -> x.toString())
+          .filter(f -> f.endsWith(CarbonCommonConstants.DELETE_DELTA_FILE_EXT))
+          .collect(Collectors.toList());
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    return deltaFiles;
+  }
+
+  private String[] getDeleteDeltaFiles(String segmentFilePath, List<String> 
allDeleteDeltaFiles) {
+    List<String> deleteDeltaFiles = new ArrayList<>();
+    String segmentFileName = null;
+    String[] pathElements = 
segmentFilePath.split(Pattern.quote(File.separator));
+    if (ArrayUtils.isNotEmpty(pathElements)) {
+      segmentFileName = pathElements[pathElements.length - 1];
+    }
+
+    /* DeleteDeltaFiles for a segment will be mapped on the basis of name.
+       So extract the expectedDeleteDeltaFileName by removing the
+       compressor name and part number from the segmentFileName.
+    */
+
+    String expectedDeleteDeltaFileName = null;
+    if (segmentFileName != null && !segmentFileName.isEmpty()) {
+      int startIndex = segmentFileName.indexOf(CarbonCommonConstants.HYPHEN);
+      int endIndex = segmentFileName.indexOf(CarbonCommonConstants.UNDERSCORE);
+      if (startIndex != -1 && endIndex != -1) {
+        expectedDeleteDeltaFileName = segmentFileName.substring(startIndex + 
1, endIndex);
+      }
+    }
+    String deleteDeltaFullFileName = null;
+    for (String deltaFile : allDeleteDeltaFiles) {
+      String[] deleteDeltaPathElements = 
deltaFile.split(Pattern.quote(File.separator));
+      if (ArrayUtils.isNotEmpty(deleteDeltaPathElements)) {
+        deleteDeltaFullFileName = 
deleteDeltaPathElements[deleteDeltaPathElements.length - 1];
+      }
+      int underScoreIndex = 
deleteDeltaFullFileName.indexOf(CarbonCommonConstants.UNDERSCORE);
+      if (underScoreIndex != -1) {
+        String deleteDeltaFileName = deleteDeltaFullFileName.substring(0, 
underScoreIndex);
+        if (deleteDeltaFileName.equals(expectedDeleteDeltaFileName)) {
+          deleteDeltaFiles.add(deltaFile);
+        }
+      }
+    }
+    String[] deleteDeltaFile = new String[deleteDeltaFiles.size()];
+    int currentIndex = 0;
+    for (String deltaFile : deleteDeltaFiles) {
+      deleteDeltaFile[currentIndex++] = deltaFile;
+    }
+    return deleteDeltaFile;
+  }
 }
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
index f52030c..1ddd3c6 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
@@ -17,15 +17,19 @@
 
 package org.apache.carbondata.hadoop.api;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.regex.Pattern;
 
 import org.apache.carbondata.common.exceptions.DeprecatedFeatureException;
 import org.apache.carbondata.common.logging.LogServiceFactory;
@@ -36,12 +40,16 @@ import 
org.apache.carbondata.core.metadata.datatype.StructField;
 import org.apache.carbondata.core.metadata.datatype.StructType;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.mutate.DeleteDeltaBlockDetails;
+import org.apache.carbondata.core.mutate.TupleIdEnum;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonThreadFactory;
 import org.apache.carbondata.core.util.DataLoadMetrics;
 import org.apache.carbondata.core.util.DataTypeUtil;
 import org.apache.carbondata.core.util.ObjectSerializationUtil;
 import org.apache.carbondata.core.util.ThreadLocalSessionInfo;
+import org.apache.carbondata.core.writer.CarbonDeleteDeltaWriterImpl;
 import org.apache.carbondata.hadoop.internal.ObjectArrayWritable;
 import org.apache.carbondata.processing.loading.ComplexDelimitersEnum;
 import org.apache.carbondata.processing.loading.DataLoadExecutor;
@@ -550,4 +558,51 @@ public class CarbonTableOutputFormat extends 
FileOutputFormat<NullWritable, Obje
       super.close(taskAttemptContext);
     }
   }
+
+  public static RecordWriter<NullWritable, ObjectArrayWritable> 
getDeleteDeltaRecordWriter(
+      String path) {
+    return (new RecordWriter<NullWritable, ObjectArrayWritable>() {
+      private final ArrayList<String> tupleId = new ArrayList<>();
+
+      @Override
+      public void write(NullWritable aVoid, ObjectArrayWritable objects) {
+        this.tupleId.add((String) objects.get()[0]);
+      }
+
+      @Override
+      public void close(TaskAttemptContext taskAttemptContext) throws 
IOException {
+        Map<String, DeleteDeltaBlockDetails> blockToDeleteDeltaBlockMapping = 
new HashMap<>();
+        DeleteDeltaBlockDetails blockDetails;
+        String blockName;
+        for (String tuple : tupleId) {
+          blockName = CarbonUpdateUtil.getBlockName(
+              
(tuple.split(Pattern.quote(File.separator))[TupleIdEnum.BLOCK_ID.getTupleIdIndex()]));
+
+          if (!blockToDeleteDeltaBlockMapping.containsKey(blockName)) {
+            blockDetails = new DeleteDeltaBlockDetails(blockName);
+            blockToDeleteDeltaBlockMapping.put(blockName, blockDetails);
+          }
+          blockDetails = blockToDeleteDeltaBlockMapping.get(blockName);
+          try {
+            blockDetails.addBlocklet(
+                CarbonUpdateUtil.getRequiredFieldFromTID(tuple, 
TupleIdEnum.BLOCKLET_ID),
+                CarbonUpdateUtil.getRequiredFieldFromTID(tuple, 
TupleIdEnum.OFFSET), Integer
+                    .parseInt(
+                        CarbonUpdateUtil.getRequiredFieldFromTID(tuple, 
TupleIdEnum.PAGE_ID)));
+          } catch (Exception e) {
+            LOG.error(e.getMessage());
+            throw new RuntimeException(e);
+          }
+        }
+        for (Map.Entry<String, DeleteDeltaBlockDetails> block : 
blockToDeleteDeltaBlockMapping
+            .entrySet()) {
+          String deleteDeltaPath = 
CarbonUpdateUtil.getDeleteDeltaFilePath(path, block.getKey(),
+              String.valueOf(System.currentTimeMillis()));
+          CarbonDeleteDeltaWriterImpl deleteDeltaWriter =
+              new CarbonDeleteDeltaWriterImpl(deleteDeltaPath);
+          deleteDeltaWriter.write(block.getValue());
+        }
+      }
+    });
+  }
 }
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonVectorizedRecordReader.java
 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonVectorizedRecordReader.java
index 491094d..2d6626a 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonVectorizedRecordReader.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonVectorizedRecordReader.java
@@ -58,6 +58,7 @@ public class CarbonVectorizedRecordReader extends 
AbstractRecordReader<Object> {
 
   private static final Logger LOGGER =
       
LogServiceFactory.getLogService(CarbonVectorizedRecordReader.class.getName());
+  private static final int DEFAULT_BATCH_SIZE = 4 * 1024;
 
   private CarbonColumnarBatch carbonColumnarBatch;
 
@@ -67,6 +68,8 @@ public class CarbonVectorizedRecordReader extends 
AbstractRecordReader<Object> {
 
   private int numBatched = 0;
 
+  private int rowId = 0;
+
   private AbstractDetailQueryResultIterator iterator;
 
   private final QueryModel queryModel;
@@ -126,6 +129,10 @@ public class CarbonVectorizedRecordReader extends 
AbstractRecordReader<Object> {
     if (batchIdx >= numBatched) {
       if (!nextBatch()) return false;
     }
+    while (rowId < carbonColumnarBatch.getRowCounter() && carbonColumnarBatch
+        .getFilteredRows()[rowId]) {
+      rowId++;
+    }
     ++batchIdx;
     return true;
   }
@@ -138,6 +145,7 @@ public class CarbonVectorizedRecordReader extends 
AbstractRecordReader<Object> {
       if (numBatched == 0) {
         return nextBatch();
       }
+      rowId = 0;
       batchIdx = 0;
       return true;
     }
@@ -186,7 +194,7 @@ public class CarbonVectorizedRecordReader extends 
AbstractRecordReader<Object> {
       }
       carbonColumnarBatch = new CarbonColumnarBatch(vectors,
           
CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT,
-          new boolean[] {});
+          new boolean[DEFAULT_BATCH_SIZE]);
     }
   }
 
@@ -203,7 +211,7 @@ public class CarbonVectorizedRecordReader extends 
AbstractRecordReader<Object> {
         row[i] = row[projectionMapping.get(i)];
       } else {
         Object data = 
carbonColumnarBatch.columnVectors[projectionMapping.get(i)]
-                .getData(batchIdx - 1);
+                .getData(rowId);
         if (carbonColumnarBatch.columnVectors[i].getType() == DataTypes.STRING
                 || carbonColumnarBatch.columnVectors[i].getType() == 
DataTypes.VARCHAR) {
           if (data == null) {
@@ -219,10 +227,11 @@ public class CarbonVectorizedRecordReader extends 
AbstractRecordReader<Object> {
           }
         } else {
           row[i] = carbonColumnarBatch.columnVectors[projectionMapping.get(i)]
-                  .getData(batchIdx - 1);
+                  .getData(rowId);
         }
       }
     }
+    rowId++;
     return row;
   }
 
diff --git 
a/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonIUD.java 
b/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonIUD.java
new file mode 100644
index 0000000..15d2202
--- /dev/null
+++ b/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonIUD.java
@@ -0,0 +1,394 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.sdk.file;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.Field;
+import org.apache.carbondata.core.scan.expression.ColumnExpression;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.expression.LiteralExpression;
+import 
org.apache.carbondata.core.scan.expression.conditional.EqualToExpression;
+import org.apache.carbondata.core.scan.expression.logical.AndExpression;
+import org.apache.carbondata.core.scan.expression.logical.OrExpression;
+import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat;
+import org.apache.carbondata.hadoop.internal.ObjectArrayWritable;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.RecordWriter;
+
+public class CarbonIUD {
+
+  private final Configuration configuration;
+  private final Map<String, Map<String, Set<String>>> 
filterColumnToValueMappingForDelete;
+  private final Map<String, Map<String, Set<String>>> 
filterColumnToValueMappingForUpdate;
+  private final Map<String, Map<String, String>> updateColumnToValueMapping;
+
+  private CarbonIUD(Configuration conf) {
+    configuration = conf;
+    filterColumnToValueMappingForDelete = new HashMap<>();
+    filterColumnToValueMappingForUpdate = new HashMap<>();
+    updateColumnToValueMapping = new HashMap<>();
+  }
+
+  /**
+   * @return CarbonIUD object
+   */
+  public static CarbonIUD getInstance(Configuration conf) {
+    return new CarbonIUD(conf);
+  }
+
+  public static CarbonIUD getInstance() {
+    return new CarbonIUD(null);
+  }
+
+  /**
+   * @param path   is the table path on which delete is performed
+   * @param column is the columnName on which records have to be deleted
+   * @param value  of column on which the records have to be deleted
+   * @return CarbonIUD object
+   */
+  public CarbonIUD delete(String path, String column, String value) {
+    prepareDelete(path, column, value, filterColumnToValueMappingForDelete);
+    return this;
+  }
+
+  /**
+   * This method deletes the rows at given path by applying the 
filterExpression
+   *
+   * @param path             is the table path on which delete is performed
+   * @param filterExpression is the expression to delete the records
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public void delete(String path, Expression filterExpression)
+      throws IOException, InterruptedException {
+    CarbonReader reader = CarbonReader.builder(path)
+        .projection(new String[] { 
CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID })
+        .withHadoopConf(configuration)
+        .filter(filterExpression).build();
+
+    RecordWriter<NullWritable, ObjectArrayWritable> deleteDeltaWriter =
+        CarbonTableOutputFormat.getDeleteDeltaRecordWriter(path);
+    ObjectArrayWritable writable = new ObjectArrayWritable();
+    while (reader.hasNext()) {
+      Object[] row = (Object[]) reader.readNextRow();
+      writable.set(row);
+      deleteDeltaWriter.write(NullWritable.get(), writable);
+    }
+    deleteDeltaWriter.close(null);
+    reader.close();
+  }
+
+  /**
+   * Calling this method will start the execution of delete process
+   *
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  private void closeDelete() throws IOException, InterruptedException {
+    for (Map.Entry<String, Map<String, Set<String>>> path : 
this.filterColumnToValueMappingForDelete
+        .entrySet()) {
+      deleteExecution(path.getKey());
+    }
+  }
+
+  /**
+   * @param path      is the table path on which update is performed
+   * @param column    is the columnName on which records have to be updated
+   * @param value     of column on which the records have to be updated
+   * @param updColumn is the name of updatedColumn
+   * @param updValue  is the value of updatedColumn
+   * @return CarbonUID
+   */
+  public CarbonIUD update(String path, String column, String value, String 
updColumn,
+      String updValue) {
+    prepareUpdate(path, column, value, updColumn, updValue);
+    return this;
+  }
+
+  /**
+   * This method updates the rows at given path by applying the 
filterExpression
+   *
+   * @param path                        is the table path on which update is 
performed.
+   * @param filterExpression            is the expression object to update the 
records
+   * @param updatedColumnToValueMapping contains the mapping of updatedColumns 
to updatedValues
+   * @throws IOException
+   * @throws InterruptedException
+   * @throws InvalidLoadOptionException
+   */
+  public void update(String path, Expression filterExpression,
+      Map<String, String> updatedColumnToValueMapping)
+      throws IOException, InterruptedException, InvalidLoadOptionException {
+    List<String> indexFiles = getCarbonIndexFile(path);
+    Schema schema = 
CarbonSchemaReader.readSchema(indexFiles.get(0)).asOriginOrder();
+    Field[] fields = schema.getFields();
+    String[] projectionColumns = new String[fields.length + 1];
+    for (int i = 0; i < fields.length; i++) {
+      projectionColumns[i] = (fields[i].getFieldName());
+    }
+    projectionColumns[projectionColumns.length - 1] =
+        CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID;
+    CarbonWriter writer =
+        CarbonWriter.builder().outputPath(path)
+            .withHadoopConf(configuration)
+            .withCsvInput(schema)
+            .writtenBy("CarbonIUD")
+            .build();
+    CarbonReader reader =
+        CarbonReader.builder(path).projection(projectionColumns)
+            .withHadoopConf(configuration)
+            .filter(filterExpression).build();
+    RecordWriter<NullWritable, ObjectArrayWritable> deleteDeltaWriter =
+        CarbonTableOutputFormat.getDeleteDeltaRecordWriter(path);
+    ObjectArrayWritable writable = new ObjectArrayWritable();
+
+    while (reader.hasNext()) {
+      Object[] row = (Object[]) reader.readNextRow();
+      writable.set(Arrays.copyOfRange(row, row.length - 1, row.length));
+      for (Map.Entry<String, String> column : 
updatedColumnToValueMapping.entrySet()) {
+        row[getColumnIndex(fields, column.getKey())] = column.getValue();
+      }
+      writer.write(Arrays.copyOfRange(row, 0, row.length - 1));
+      deleteDeltaWriter.write(NullWritable.get(), writable);
+    }
+    deleteDeltaWriter.close(null);
+    writer.close();
+    reader.close();
+  }
+
+  /**
+   * Calling this method will start the execution of update process
+   *
+   * @throws IOException
+   * @throws InterruptedException
+   * @throws InvalidLoadOptionException
+   */
+  private void closeUpdate() throws IOException, InterruptedException, 
InvalidLoadOptionException {
+    for (Map.Entry<String, Map<String, Set<String>>> path : 
this.filterColumnToValueMappingForUpdate
+        .entrySet()) {
+      if (this.updateColumnToValueMapping.containsKey(path.getKey())) {
+        updateExecution(path.getKey());
+      }
+    }
+  }
+
+  /**
+   * Calling this method will execute delete and update operation in one 
statement.
+   * This method will first perform delete operation and then update operation
+   * (update operation is only performed if the rows to be
+   * updated are not deleted while delete operation)
+   * For eg:
+   * CarbonIUD.getInstance().delete().delete().update()
+   *
+   * @throws IOException
+   * @throws InterruptedException
+   * @throws InvalidLoadOptionException
+   */
+  public void commit() throws IOException, InterruptedException, 
InvalidLoadOptionException {
+    if (filterColumnToValueMappingForDelete.size() != 0) {
+      closeDelete();
+    }
+    if (filterColumnToValueMappingForUpdate.size() != 0 && !ifRowsDeleted()) {
+      closeUpdate();
+    }
+  }
+
+  private void updateExecution(String path)
+      throws IOException, InterruptedException, InvalidLoadOptionException {
+    Expression filterExpression =
+        getExpression(path, 
this.filterColumnToValueMappingForUpdate.get(path));
+    update(path, filterExpression, this.updateColumnToValueMapping.get(path));
+  }
+
+  private void deleteExecution(String path) throws IOException, 
InterruptedException {
+    Expression filterExpression =
+        getExpression(path, 
this.filterColumnToValueMappingForDelete.get(path));
+    delete(path, filterExpression);
+  }
+
+  /**
+   * This method prepares the column to value mapping for update operation
+   * for eg: UPDATE updColumn = updValue WHERE column = value
+   */
+  private void prepareUpdate(String path, String column, String value, String 
updColumn,
+      String updValue) {
+    prepareDelete(path, column, value, filterColumnToValueMappingForUpdate);
+    updColumn = updColumn.toLowerCase().trim();
+    if (this.updateColumnToValueMapping.containsKey(path)) {
+      this.updateColumnToValueMapping.get(path).put(updColumn, updValue);
+    } else {
+      Map<String, String> columnToValue = new HashMap<>();
+      columnToValue.put(updColumn, updValue);
+      this.updateColumnToValueMapping.put(path, columnToValue);
+    }
+  }
+
+  /**
+   * This method prepares the column to value mapping for delete operation
+   * for eg: DELETE WHERE column = value
+   */
+  private void prepareDelete(String path, String column, String value,
+      Map<String, Map<String, Set<String>>> filterColumnToValueMapping) {
+    column = column.toLowerCase().trim();
+    if (filterColumnToValueMapping.containsKey(path)) {
+      Map<String, Set<String>> columnToValueMapping = 
filterColumnToValueMapping.get(path);
+      if (columnToValueMapping.containsKey(column)) {
+        columnToValueMapping.get(column).add(value);
+      } else {
+        Set<String> columnValues = new HashSet<>();
+        columnValues.add(value);
+        columnToValueMapping.put(column, columnValues);
+      }
+    } else {
+      Map<String, Set<String>> columnToValueMapping = new HashMap<>();
+      Set<String> columnValues = new HashSet<>();
+      columnValues.add(value);
+      columnToValueMapping.put(column, columnValues);
+      filterColumnToValueMapping.put(path, columnToValueMapping);
+    }
+  }
+
+  /**
+   * This method will convert the given columnToValue mapping into expression 
object
+   * If columnToValue mapping have following entries:
+   * name --> {karan, kunal, vikram}
+   * age --> {24}
+   * the expression will look like this for above entries:
+   * ((name = karan || name = kunal || name = vikram) && (age = 24))
+   */
+  private Expression getExpression(String path, Map<String, Set<String>> 
columnToValueMapping)
+      throws IOException {
+    List<String> indexFiles = getCarbonIndexFile(path);
+    Schema schema = 
CarbonSchemaReader.readSchema(indexFiles.get(0)).asOriginOrder();
+    Field[] fields = schema.getFields();
+    List<Expression> listOfExpressions = new ArrayList<>();
+    for (Map.Entry<String, Set<String>> column : 
columnToValueMapping.entrySet()) {
+      DataType dataType = getColumnDataType(fields, column.getKey());
+      List<Expression> listOfOrExpressions = new ArrayList<>();
+      for (String value : column.getValue()) {
+        listOfOrExpressions.add(
+            new EqualToExpression(new ColumnExpression(column.getKey(), 
dataType),
+                new LiteralExpression(value, dataType)));
+      }
+      Expression OrFilterExpression = null;
+      if (listOfOrExpressions.size() > 0) {
+        OrFilterExpression = listOfOrExpressions.get(0);
+      }
+      for (int i = 1; i < listOfOrExpressions.size(); i++) {
+        OrFilterExpression = new OrExpression(OrFilterExpression, 
listOfOrExpressions.get(i));
+      }
+      listOfExpressions.add(OrFilterExpression);
+    }
+    Expression filterExpression = null;
+    if (listOfExpressions.size() > 0) {
+      filterExpression = listOfExpressions.get(0);
+    }
+    for (int i = 1; i < listOfExpressions.size(); i++) {
+      filterExpression = new AndExpression(filterExpression, 
listOfExpressions.get(i));
+    }
+    return filterExpression;
+  }
+
+  private int getColumnIndex(Field[] fields, String column) {
+    int index = -1;
+    for (Field field : fields) {
+      if (field.getFieldName().equals(column)) {
+        index = field.getSchemaOrdinal();
+        break;
+      }
+    }
+    if (index == -1) {
+      throw new RuntimeException("ColumnName doesn't exists");
+    }
+    return index;
+  }
+
+  private List<String> getCarbonIndexFile(String path) {
+    List<String> indexFiles = null;
+    try (Stream<Path> walk = Files.walk(Paths.get(path))) {
+      indexFiles = walk.map(x -> x.toString())
+          .filter(f -> f.endsWith(CarbonCommonConstants.UPDATE_INDEX_FILE_EXT))
+          .collect(Collectors.toList());
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    if (indexFiles == null || indexFiles.size() < 1) {
+      throw new RuntimeException("Carbon index file does not exists.");
+    }
+    return indexFiles;
+  }
+
+  private DataType getColumnDataType(Field[] fields, String column) {
+    DataType type = null;
+    for (Field field : fields) {
+      if (field.getFieldName().equals(column)) {
+        type = field.getDataType();
+        break;
+      }
+    }
+    if (null == type) {
+      throw new RuntimeException("ColumnName doesn't exists");
+    }
+    if (type.isComplexType()) {
+      throw new RuntimeException("IUD operation not supported for Complex data 
types");
+    }
+    return type;
+  }
+
+  private boolean ifRowsDeleted() {
+    for (Map.Entry<String, Map<String, Set<String>>> path : 
this.filterColumnToValueMappingForUpdate
+        .entrySet()) {
+      if 
(!this.filterColumnToValueMappingForDelete.containsKey(path.getKey())) {
+        return false;
+      } else {
+        for (Map.Entry<String, Set<String>> column : 
this.filterColumnToValueMappingForDelete
+            .get(path.getKey()).entrySet()) {
+          if (!this.filterColumnToValueMappingForUpdate.get(path.getKey())
+              .containsKey(column.getKey())) {
+            return false;
+          } else {
+            for (String value : 
this.filterColumnToValueMappingForUpdate.get(path.getKey())
+                .get(column.getKey())) {
+              if (!column.getValue().contains(value)) {
+                return false;
+              }
+            }
+          }
+        }
+      }
+    }
+    return true;
+  }
+}
diff --git 
a/sdk/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonIUDTest.java 
b/sdk/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonIUDTest.java
new file mode 100644
index 0000000..ecb4249
--- /dev/null
+++ b/sdk/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonIUDTest.java
@@ -0,0 +1,504 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.sdk.file;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.datatype.Field;
+import org.apache.carbondata.core.scan.expression.ColumnExpression;
+import org.apache.carbondata.core.scan.expression.LiteralExpression;
+import org.apache.carbondata.core.scan.expression.conditional.*;
+import org.apache.carbondata.core.scan.expression.logical.AndExpression;
+import org.apache.carbondata.core.scan.expression.logical.OrExpression;
+
+import org.apache.avro.generic.GenericData;
+import org.apache.commons.io.FileUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class CarbonIUDTest {
+
+  @Test
+  public void testDelete() throws Exception {
+    String path = "./testWriteFiles";
+    FileUtils.deleteDirectory(new File(path));
+    Field[] fields = new Field[3];
+    fields[0] = new Field("name", DataTypes.STRING);
+    fields[1] = new Field("age", DataTypes.INT);
+    fields[2] = new Field("doubleField", DataTypes.DOUBLE);
+    TestUtil.writeFilesAndVerify(10, new Schema(fields), path);
+    CarbonIUD.getInstance().delete(path, "age", "0").commit();
+    CarbonIUD.getInstance().delete(path, "age", "1").delete(path, "name", 
"robot1").commit();
+    CarbonIUD.getInstance().delete(path, "age", "2").delete(path, "name", 
"robot2")
+        .delete(path, "doubleField", "1.0").commit();
+    CarbonIUD.getInstance().delete(path, "name", "robot3").delete(path, 
"name", "robot4")
+        .delete(path, "name", "robot5").commit();
+    CarbonIUD.getInstance().delete(path, "name", "robot6").delete(path, 
"name", "robot7")
+        .delete(path, "name", "robot8").delete(path, "age", "6").delete(path, 
"age", "7").commit();
+
+    CarbonReader reader =
+        CarbonReader.builder(path).projection(new String[] { "name", "age", 
"doubleField" })
+            .build();
+
+    int i = 0;
+    while (reader.hasNext()) {
+      Object[] row = (Object[]) reader.readNextRow();
+      assert (((String) row[0]).contains("robot8") || ((String) 
row[0]).contains("robot9"));
+      assert (((int) (row[1])) > 7);
+      assert ((double) row[2] > 3.5);
+      i++;
+    }
+    Assert.assertEquals(i, 2);
+    reader.close();
+    FileUtils.deleteDirectory(new File(path));
+  }
+
+  @Test
+  public void testDeleteWithConditionalExpressions() throws Exception {
+    String path = "./testWriteFiles";
+    FileUtils.deleteDirectory(new File(path));
+    Field[] fields = new Field[3];
+    fields[0] = new Field("name", DataTypes.STRING);
+    fields[1] = new Field("age", DataTypes.INT);
+    fields[2] = new Field("doubleField", DataTypes.DOUBLE);
+    TestUtil.writeFilesAndVerify(20, new Schema(fields), path);
+    ColumnExpression columnExpression1 = new ColumnExpression("age", 
DataTypes.INT);
+    LessThanExpression lessThanExpression =
+        new LessThanExpression(columnExpression1, new LiteralExpression("3", 
DataTypes.INT));
+    CarbonIUD.getInstance().delete(path, lessThanExpression);
+
+    ColumnExpression columnExpression2 = new ColumnExpression("age", 
DataTypes.INT);
+    GreaterThanExpression greaterThanExpression =
+        new GreaterThanExpression(columnExpression2, new 
LiteralExpression("16", DataTypes.INT));
+    CarbonIUD.getInstance().delete(path, greaterThanExpression);
+
+    ColumnExpression columnExpression3 = new ColumnExpression("age", 
DataTypes.INT);
+    LessThanEqualToExpression lessThanEqualToExpression =
+        new LessThanEqualToExpression(columnExpression3, new 
LiteralExpression("5", DataTypes.INT));
+    CarbonIUD.getInstance().delete(path, lessThanEqualToExpression);
+
+    ColumnExpression columnExpression4 = new ColumnExpression("age", 
DataTypes.INT);
+    GreaterThanEqualToExpression greaterThanEqualToExpression =
+        new GreaterThanEqualToExpression(columnExpression4,
+            new LiteralExpression("15", DataTypes.INT));
+    CarbonIUD.getInstance().delete(path, greaterThanEqualToExpression);
+
+    CarbonReader reader =
+        CarbonReader.builder(path).projection(new String[] { "name", "age", 
"doubleField" })
+            .build();
+
+    int i = 0;
+    while (reader.hasNext()) {
+      Object[] row = (Object[]) reader.readNextRow();
+      assert (!((int) row[1] <= 5 || (int) row[1] >= 15));
+      i++;
+    }
+    Assert.assertEquals(i, 9);
+    reader.close();
+    FileUtils.deleteDirectory(new File(path));
+  }
+
+  @Test
+  public void testDeleteWithAndFilter() throws Exception {
+    String path = "./testWriteFiles";
+    FileUtils.deleteDirectory(new File(path));
+    Field[] fields = new Field[3];
+    fields[0] = new Field("name", DataTypes.STRING);
+    fields[1] = new Field("age", DataTypes.INT);
+    fields[2] = new Field("doubleField", DataTypes.DOUBLE);
+    TestUtil.writeFilesAndVerify(20, new Schema(fields), path);
+
+    ColumnExpression columnExpression = new ColumnExpression("doubleField", 
DataTypes.DOUBLE);
+    LessThanExpression lessThanExpression1 =
+        new LessThanExpression(columnExpression, new LiteralExpression("3.5", 
DataTypes.DOUBLE));
+
+    ColumnExpression columnExpression2 = new ColumnExpression("age", 
DataTypes.INT);
+    LessThanExpression lessThanExpression2 =
+        new LessThanExpression(columnExpression2, new LiteralExpression("4", 
DataTypes.INT));
+
+    AndExpression andExpression = new AndExpression(lessThanExpression1, 
lessThanExpression2);
+    CarbonIUD.getInstance().delete(path, andExpression);
+
+    CarbonReader reader =
+        CarbonReader.builder(path).projection(new String[] { "name", "age", 
"doubleField" })
+            .build();
+
+    int i = 0;
+    while (reader.hasNext()) {
+      Object[] row = (Object[]) reader.readNextRow();
+      assert (!((int) row[1] < 4));
+      i++;
+    }
+    Assert.assertEquals(i, 16);
+    reader.close();
+    FileUtils.deleteDirectory(new File(path));
+  }
+
+  @Test
+  public void testDeleteWithORFilter() throws Exception {
+    String path = "./testWriteFiles";
+    FileUtils.deleteDirectory(new File(path));
+    Field[] fields = new Field[3];
+    fields[0] = new Field("name", DataTypes.STRING);
+    fields[1] = new Field("age", DataTypes.INT);
+    fields[2] = new Field("doubleField", DataTypes.DOUBLE);
+    TestUtil.writeFilesAndVerify(20, new Schema(fields), path);
+
+    ColumnExpression columnExpression = new ColumnExpression("doubleField", 
DataTypes.DOUBLE);
+    LessThanExpression lessThanExpression =
+        new LessThanExpression(columnExpression, new LiteralExpression("3.5", 
DataTypes.DOUBLE));
+
+    ColumnExpression columnExpression2 = new ColumnExpression("age", 
DataTypes.INT);
+    GreaterThanExpression greaterThanExpression =
+        new GreaterThanExpression(columnExpression2, new 
LiteralExpression("11", DataTypes.INT));
+
+    OrExpression orExpression = new OrExpression(lessThanExpression, 
greaterThanExpression);
+    CarbonIUD.getInstance().delete(path, orExpression);
+
+    CarbonReader reader =
+        CarbonReader.builder(path).projection(new String[] { "name", "age", 
"doubleField" })
+            .build();
+
+    int i = 0;
+    while (reader.hasNext()) {
+      Object[] row = (Object[]) reader.readNextRow();
+      assert (!((int) row[1] > 11 || (double) row[2] < 3.5));
+      i++;
+    }
+    Assert.assertEquals(i, 5);
+    reader.close();
+    FileUtils.deleteDirectory(new File(path));
+  }
+
+  @Test
+  public void testDeleteAtMultiplePaths() throws Exception {
+    String path1 = "./testWriteFiles1";
+    String path2 = "./testWriteFiles2";
+    FileUtils.deleteDirectory(new File(path1));
+    FileUtils.deleteDirectory(new File(path2));
+    Field[] fields = new Field[3];
+    fields[0] = new Field("name", DataTypes.STRING);
+    fields[1] = new Field("age", DataTypes.INT);
+    fields[2] = new Field("doubleField", DataTypes.DOUBLE);
+    TestUtil.writeFilesAndVerify(10, new Schema(fields), path1);
+    TestUtil.writeFilesAndVerify(10, new Schema(fields), path2);
+    CarbonIUD.getInstance().delete(path1, "age", "2").delete(path2, "age", "3")
+        .delete(path1, "name", "robot2").delete(path2, "name", 
"robot3").commit();
+
+    CarbonReader reader1 =
+        CarbonReader.builder(path1).projection(new String[] { "name", "age", 
"doubleField" })
+            .build();
+
+    int i = 0;
+    while (reader1.hasNext()) {
+      Object[] row = (Object[]) reader1.readNextRow();
+      assert (!(((String) row[0]).contains("robot2")));
+      assert (((int) (row[1])) != 2);
+      assert ((double) row[2] != 1.0);
+      i++;
+    }
+    Assert.assertEquals(i, 9);
+    reader1.close();
+    FileUtils.deleteDirectory(new File(path1));
+
+    CarbonReader reader2 =
+        CarbonReader.builder(path2).projection(new String[] { "name", "age", 
"doubleField" })
+            .build();
+
+    i = 0;
+    while (reader2.hasNext()) {
+      Object[] row = (Object[]) reader2.readNextRow();
+      assert (!(((String) row[0]).contains("robot3")));
+      assert (((int) (row[1])) != 3);
+      assert ((double) row[2] != 1.5);
+      i++;
+    }
+    Assert.assertEquals(i, 9);
+    reader2.close();
+    FileUtils.deleteDirectory(new File(path2));
+  }
+
+  @Test
+  public void testDeleteInMultipleSegments() throws Exception {
+    String path = "./testWriteFiles";
+    FileUtils.deleteDirectory(new File(path));
+    Field[] fields = new Field[3];
+    fields[0] = new Field("name", DataTypes.STRING);
+    fields[1] = new Field("age", DataTypes.INT);
+    fields[2] = new Field("doubleField", DataTypes.DOUBLE);
+    TestUtil.writeFilesAndVerify(10, new Schema(fields), path);
+    TestUtil.writeFilesAndVerify(10, new Schema(fields), path);
+    TestUtil.writeFilesAndVerify(10, new Schema(fields), path);
+    CarbonIUD.getInstance().delete(path, "age", "2").delete(path, "name", 
"robot2").commit();
+
+    CarbonReader reader =
+        CarbonReader.builder(path).projection(new String[] { "name", "age", 
"doubleField" })
+            .build();
+
+    int i = 0;
+    while (reader.hasNext()) {
+      Object[] row = (Object[]) reader.readNextRow();
+      assert (!(((String) row[0]).contains("robot2")));
+      assert (((int) (row[1])) != 2);
+      assert ((double) row[2] != 1.0);
+      i++;
+    }
+    Assert.assertEquals(i, 27);
+    reader.close();
+    FileUtils.deleteDirectory(new File(path));
+  }
+
+  @Test
+  public void testUpdate() throws Exception {
+    String path = "./testWriteFiles";
+    FileUtils.deleteDirectory(new File(path));
+    Field[] fields = new Field[3];
+    fields[0] = new Field("name", DataTypes.STRING);
+    fields[1] = new Field("age", DataTypes.INT);
+    fields[2] = new Field("doubleField", DataTypes.DOUBLE);
+    TestUtil.writeFilesAndVerify(20, new Schema(fields), path);
+    CarbonIUD.getInstance().update(path, "age", "2", "age", "3").commit();
+    CarbonIUD.getInstance().update(path, "name", "robot2", "age", "3")
+        .update(path, "age", "12", "name", "robot13").commit();
+
+    ColumnExpression columnExpression1 = new ColumnExpression("age", 
DataTypes.INT);
+    EqualToExpression equalToExpression1 =
+        new EqualToExpression(columnExpression1, new LiteralExpression("3", 
DataTypes.INT));
+
+    CarbonReader reader1 =
+        CarbonReader.builder(path).projection(new String[] { "name", "age", 
"doubleField" })
+            .filter(equalToExpression1).build();
+
+    int i = 0;
+    while (reader1.hasNext()) {
+      Object[] row = (Object[]) reader1.readNextRow();
+      assert (((int) (row[1])) == 3);
+      i++;
+    }
+    Assert.assertEquals(i, 3);
+    reader1.close();
+
+    CarbonReader reader2 =
+        CarbonReader.builder(path).projection(new String[] { "name", "age", 
"doubleField" })
+            .build();
+
+    i = 0;
+    while (reader2.hasNext()) {
+      Object[] row = (Object[]) reader2.readNextRow();
+      assert (((int) (row[1])) != 2);
+      i++;
+    }
+    Assert.assertEquals(i, 20);
+    reader2.close();
+    FileUtils.deleteDirectory(new File(path));
+  }
+
+  @Test
+  public void testDeleteOnUpdatedRows() throws Exception {
+    String path = "./testWriteFiles";
+    FileUtils.deleteDirectory(new File(path));
+    Field[] fields = new Field[3];
+    fields[0] = new Field("name", DataTypes.STRING);
+    fields[1] = new Field("age", DataTypes.INT);
+    fields[2] = new Field("doubleField", DataTypes.DOUBLE);
+    TestUtil.writeFilesAndVerify(20, new Schema(fields), path);
+    Map<String, String> updateColumnToValue = new HashMap<>();
+    updateColumnToValue.put("name", "robot");
+    updateColumnToValue.put("age", "24");
+
+    ColumnExpression columnExpression = new ColumnExpression("age", 
DataTypes.INT);
+    LessThanExpression lessThanExpression =
+        new LessThanExpression(columnExpression, new LiteralExpression("10", 
DataTypes.INT));
+    CarbonIUD.getInstance().update(path, lessThanExpression, 
updateColumnToValue);
+    CarbonIUD.getInstance().delete(path, "doubleField", "2.0").commit();
+
+    ColumnExpression columnExpression1 = new ColumnExpression("age", 
DataTypes.INT);
+    EqualToExpression equalToExpression =
+        new EqualToExpression(columnExpression1, new LiteralExpression("24", 
DataTypes.INT));
+
+    CarbonReader reader =
+        CarbonReader.builder(path).projection(new String[] { "name", "age", 
"doubleField" })
+            .filter(equalToExpression).build();
+
+    int i = 0;
+    while (reader.hasNext()) {
+      Object[] row = (Object[]) reader.readNextRow();
+      assert (((String) row[0]).contains("robot"));
+      assert (((int) (row[1])) == 24);
+      i++;
+    }
+    Assert.assertEquals(i, 9);
+    reader.close();
+    FileUtils.deleteDirectory(new File(path));
+  }
+
+  @Test
+  public void testDeleteAndUpdateTogether() throws Exception {
+    String path = "./testWriteFiles";
+    FileUtils.deleteDirectory(new File(path));
+    Field[] fields = new Field[3];
+    fields[0] = new Field("name", DataTypes.STRING);
+    fields[1] = new Field("age", DataTypes.INT);
+    fields[2] = new Field("doubleField", DataTypes.DOUBLE);
+    TestUtil.writeFilesAndVerify(20, new Schema(fields), path);
+    CarbonIUD.getInstance().delete(path, "name", "robot0").delete(path, 
"name", "robot1")
+        .delete(path, "name", "robot2").delete(path, "age", "0").delete(path, 
"age", "1")
+        .update(path, "name", "robot1", "name", "Karan").update(path, "age", 
"1", "age", "0")
+        .commit();
+
+    CarbonReader reader =
+        CarbonReader.builder(path).projection(new String[] { "name", "age", 
"doubleField" })
+            .build();
+
+    int i = 0;
+    while (reader.hasNext()) {
+      Object[] row = (Object[]) reader.readNextRow();
+      assert (!((String) row[0]).contains("Karan"));
+      assert (((int) (row[1])) > 1);
+      i++;
+    }
+    Assert.assertEquals(i, 18);
+    reader.close();
+    FileUtils.deleteDirectory(new File(path));
+  }
+
+  @Test
+  public void testIUDOnDifferentDataType() throws Exception {
+    String path = "./testWriteFiles";
+    FileUtils.deleteDirectory(new File(path));
+    Field[] fields = new Field[11];
+    fields[0] = new Field("stringField", DataTypes.STRING);
+    fields[1] = new Field("shortField", DataTypes.SHORT);
+    fields[2] = new Field("intField", DataTypes.INT);
+    fields[3] = new Field("longField", DataTypes.LONG);
+    fields[4] = new Field("doubleField", DataTypes.DOUBLE);
+    fields[5] = new Field("boolField", DataTypes.BOOLEAN);
+    fields[6] = new Field("dateField", DataTypes.DATE);
+    fields[7] = new Field("timeField", DataTypes.TIMESTAMP);
+    fields[8] = new Field("decimalField", DataTypes.createDecimalType(8, 2));
+    fields[9] = new Field("varcharField", DataTypes.VARCHAR);
+    fields[10] = new Field("arrayField", 
DataTypes.createArrayType(DataTypes.STRING));
+    CarbonWriter writer =
+        
CarbonWriter.builder().outputPath(path).withLoadOption("complex_delimiter_level_1",
 "#")
+            .withCsvInput(new Schema(fields)).writtenBy("IUDTest").build();
+    for (int i = 0; i < 10; i++) {
+      boolean boolValue = true;
+      String decimalValue = "12.345";
+      String dateValue = "2019-03-02";
+      if (i == 6) {
+        boolValue = false;
+      }
+      if (i == 7) {
+        decimalValue = "12.366";
+      }
+      if (i == 8) {
+        dateValue = "2019-03-03";
+      }
+      String[] row2 =
+          new String[] { "robot" + (i % 10), String.valueOf(i % 10000), 
String.valueOf(i),
+              String.valueOf(Long.MAX_VALUE - i), String.valueOf((double) i / 
2),
+              String.valueOf(boolValue), dateValue, "2019-02-12 03:03:34", 
decimalValue,
+              "varchar" + (i % 10), "Hello#World#From#Carbon" };
+      writer.write(row2);
+    }
+    writer.close();
+    CarbonIUD.getInstance().delete(path, "stringField", "robot0").commit();
+    CarbonIUD.getInstance().delete(path, "shortField", "1").commit();
+    CarbonIUD.getInstance().delete(path, "intField", "2").commit();
+    CarbonIUD.getInstance().delete(path, "longField", 
"9223372036854775804").commit();
+    CarbonIUD.getInstance().delete(path, "doubleField", "2.0").commit();
+    CarbonIUD.getInstance().delete(path, "varcharField", "varchar5").commit();
+    CarbonIUD.getInstance().delete(path, "boolField", "false").commit();
+    CarbonIUD.getInstance().delete(path, "decimalField", "12.37").commit();
+    CarbonIUD.getInstance().delete(path, "dateField", "2019-03-03").commit();
+    CarbonIUD.getInstance().delete(path, "timeField", "2019-02-12 
03:03:34").commit();
+
+    File[] indexFiles = new File(path).listFiles(new FilenameFilter() {
+      @Override
+      public boolean accept(File dir, String name) {
+        if (name == null) {
+          return false;
+        }
+        return name.endsWith("carbonindex");
+      }
+    });
+    if (indexFiles == null || indexFiles.length < 1) {
+      throw new RuntimeException("Carbon index file not exists.");
+    }
+    Schema schema = 
CarbonSchemaReader.readSchema(indexFiles[0].getAbsolutePath()).asOriginOrder();
+    String[] strings = new String[schema.getFields().length];
+    for (int i = 0; i < schema.getFields().length; i++) {
+      strings[i] = (schema.getFields())[i].getFieldName();
+    }
+    CarbonReader reader = 
CarbonReader.builder(path).projection(strings).build();
+    int i = 0;
+    while (reader.hasNext()) {
+      i++;
+    }
+    Assert.assertEquals(i, 0);
+    reader.close();
+    try {
+      CarbonIUD.getInstance().delete(path, "arrayfield", 
"{Hello,World,From,Carbon}").commit();
+    } catch (RuntimeException e) {
+      Assert.assertTrue(
+          e.getMessage().contains("IUD operation not supported for Complex 
data types"));
+    }
+    FileUtils.deleteDirectory(new File(path));
+  }
+
+  @Test
+  public void testIUDOnComplexType() throws Exception {
+    String path = "./testWriteFiles";
+    FileUtils.deleteDirectory(new File(path));
+    Field[] fields = new Field[3];
+    fields[0] = new Field("name", DataTypes.STRING);
+    fields[1] = new Field("age", DataTypes.INT);
+    fields[2] = new Field("mapRecord", 
DataTypes.createMapType(DataTypes.STRING, DataTypes.STRING));
+    String mySchema =
+        "{ " + "  \"name\": \"address\", " + "  \"type\": \"record\", " + "  
\"fields\": [ "
+            + "    { " + "      \"name\": \"name\", " + "      \"type\": 
\"string\" " + "    }, "
+            + "    { " + "      \"name\": \"age\", " + "      \"type\": 
\"int\" " + "    }, "
+            + "    { " + "      \"name\": \"mapRecord\", " + "      \"type\": 
{ "
+            + "        \"type\": \"map\", " + "        \"values\": \"string\" 
" + "      } "
+            + "    } " + "  ] " + "} ";
+
+    String json =
+        "{\"name\":\"bob\", \"age\":10, \"mapRecord\": {\"street\": 
\"k-lane\", \"city\": \"bangalore\"}}";
+    org.apache.avro.Schema nn = new 
org.apache.avro.Schema.Parser().parse(mySchema);
+    GenericData.Record record = TestUtil.jsonToAvro(json, mySchema);
+    CarbonWriter writer =
+        
CarbonWriter.builder().outputPath(path).withAvroInput(nn).writtenBy("CarbonIUDTest")
+            .build();
+    for (int i = 0; i < 10; i++) {
+      writer.write(record);
+    }
+    writer.close();
+    try {
+      CarbonIUD.getInstance()
+          .delete(path, "mapRecord", "{\"street\": \"k-lane\", \"city\": 
\"bangalore\"}").commit();
+    } catch (RuntimeException e) {
+      Assert.assertTrue(
+          e.getMessage().contains("IUD operation not supported for Complex 
data types"));
+    }
+    FileUtils.deleteDirectory(new File(path));
+  }
+}

Reply via email to