This is an automated email from the ASF dual-hosted git repository.
kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 5e10018 [CARBONDATA-3865] Implementation of delete/update feature in
carbondata SDK.
5e10018 is described below
commit 5e100188c4b3c4ea3e78be186a6cae7358a4ac46
Author: Karan-c980 <[email protected]>
AuthorDate: Wed Apr 15 00:37:42 2020 +0530
[CARBONDATA-3865] Implementation of delete/update feature in carbondata SDK.
Why is this PR needed?
Currently carbondata SDK doesn't provide delete/update feature.
This PR will supports carbondata SDK to delete/update of records from
carbondata files
What changes were proposed in this PR?
With the help of this PR carbondata SDK will support delete/update features.
This closes #3834
---
.../scan/result/vector/CarbonColumnarBatch.java | 4 +
.../hadoop/api/CarbonFileInputFormat.java | 77 ++++
.../hadoop/api/CarbonTableOutputFormat.java | 55 +++
.../hadoop/util/CarbonVectorizedRecordReader.java | 15 +-
.../org/apache/carbondata/sdk/file/CarbonIUD.java | 394 ++++++++++++++++
.../apache/carbondata/sdk/file/CarbonIUDTest.java | 504 +++++++++++++++++++++
6 files changed, 1046 insertions(+), 3 deletions(-)
diff --git
a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnarBatch.java
b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnarBatch.java
index 634ca7a..1019c35 100644
---
a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnarBatch.java
+++
b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnarBatch.java
@@ -40,6 +40,10 @@ public class CarbonColumnarBatch {
this.filteredRows = filteredRows;
}
+ public boolean[] getFilteredRows() {
+ return filteredRows;
+ }
+
public int getBatchSize() {
return batchSize;
}
diff --git
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
index 9c599ef..7922244 100644
---
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
+++
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
@@ -17,15 +17,23 @@
package org.apache.carbondata.hadoop.api;
+import java.io.File;
import java.io.IOException;
import java.io.Serializable;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.LinkedList;
import java.util.List;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.common.annotations.InterfaceStability;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.index.IndexFilter;
@@ -43,6 +51,8 @@ import
org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.ArrayUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
@@ -172,6 +182,7 @@ public class CarbonFileInputFormat<T> extends
CarbonInputFormat<T> implements Se
carbonFiles = getAllCarbonDataFiles(carbonTable.getTablePath());
}
+ List<String> allDeleteDeltaFiles =
getAllDeleteDeltaFiles(carbonTable.getTablePath());
for (CarbonFile carbonFile : carbonFiles) {
// Segment id is set to null because SDK does not write carbondata
files with respect
// to segments. So no specific name is present for this load.
@@ -184,10 +195,15 @@ public class CarbonFileInputFormat<T> extends
CarbonInputFormat<T> implements Se
info.setBlockSize(carbonFile.getLength());
info.setVersionNumber(split.getVersion().number());
info.setUseMinMaxForPruning(false);
+ if (CollectionUtils.isNotEmpty(allDeleteDeltaFiles)) {
+ split.setDeleteDeltaFiles(
+ getDeleteDeltaFiles(carbonFile.getAbsolutePath(),
allDeleteDeltaFiles));
+ }
splits.add(split);
}
splits.sort(Comparator.comparing(o -> ((CarbonInputSplit)
o).getFilePath()));
}
+
setAllColumnProjectionIfNotConfigured(job, carbonTable);
return splits;
}
@@ -239,6 +255,67 @@ public class CarbonFileInputFormat<T> extends
CarbonInputFormat<T> implements Se
List<CarbonInputSplit> dataBlocksOfSegment = getDataBlocksOfSegment(job,
carbonTable,
indexFilter, validSegments, new ArrayList<>(), new ArrayList<>());
numBlocks = dataBlocksOfSegment.size();
+ List<String> allDeleteDeltaFiles =
getAllDeleteDeltaFiles(carbonTable.getTablePath());
+ if (CollectionUtils.isNotEmpty(allDeleteDeltaFiles)) {
+ for (CarbonInputSplit split : dataBlocksOfSegment) {
+ split.setDeleteDeltaFiles(getDeleteDeltaFiles(split.getFilePath(),
allDeleteDeltaFiles));
+ }
+ }
return new LinkedList<>(dataBlocksOfSegment);
}
+
+ private List<String> getAllDeleteDeltaFiles(String path) {
+ List<String> deltaFiles = null;
+ try (Stream<Path> walk = Files.walk(Paths.get(path))) {
+ deltaFiles = walk.map(x -> x.toString())
+ .filter(f -> f.endsWith(CarbonCommonConstants.DELETE_DELTA_FILE_EXT))
+ .collect(Collectors.toList());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return deltaFiles;
+ }
+
+ private String[] getDeleteDeltaFiles(String segmentFilePath, List<String>
allDeleteDeltaFiles) {
+ List<String> deleteDeltaFiles = new ArrayList<>();
+ String segmentFileName = null;
+ String[] pathElements =
segmentFilePath.split(Pattern.quote(File.separator));
+ if (ArrayUtils.isNotEmpty(pathElements)) {
+ segmentFileName = pathElements[pathElements.length - 1];
+ }
+
+ /* DeleteDeltaFiles for a segment will be mapped on the basis of name.
+ So extract the expectedDeleteDeltaFileName by removing the
+ compressor name and part number from the segmentFileName.
+ */
+
+ String expectedDeleteDeltaFileName = null;
+ if (segmentFileName != null && !segmentFileName.isEmpty()) {
+ int startIndex = segmentFileName.indexOf(CarbonCommonConstants.HYPHEN);
+ int endIndex = segmentFileName.indexOf(CarbonCommonConstants.UNDERSCORE);
+ if (startIndex != -1 && endIndex != -1) {
+ expectedDeleteDeltaFileName = segmentFileName.substring(startIndex +
1, endIndex);
+ }
+ }
+ String deleteDeltaFullFileName = null;
+ for (String deltaFile : allDeleteDeltaFiles) {
+ String[] deleteDeltaPathElements =
deltaFile.split(Pattern.quote(File.separator));
+ if (ArrayUtils.isNotEmpty(deleteDeltaPathElements)) {
+ deleteDeltaFullFileName =
deleteDeltaPathElements[deleteDeltaPathElements.length - 1];
+ }
+ int underScoreIndex =
deleteDeltaFullFileName.indexOf(CarbonCommonConstants.UNDERSCORE);
+ if (underScoreIndex != -1) {
+ String deleteDeltaFileName = deleteDeltaFullFileName.substring(0,
underScoreIndex);
+ if (deleteDeltaFileName.equals(expectedDeleteDeltaFileName)) {
+ deleteDeltaFiles.add(deltaFile);
+ }
+ }
+ }
+ String[] deleteDeltaFile = new String[deleteDeltaFiles.size()];
+ int currentIndex = 0;
+ for (String deltaFile : deleteDeltaFiles) {
+ deleteDeltaFile[currentIndex++] = deltaFile;
+ }
+ return deleteDeltaFile;
+ }
}
diff --git
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
index f52030c..1ddd3c6 100644
---
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
+++
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
@@ -17,15 +17,19 @@
package org.apache.carbondata.hadoop.api;
+import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.regex.Pattern;
import org.apache.carbondata.common.exceptions.DeprecatedFeatureException;
import org.apache.carbondata.common.logging.LogServiceFactory;
@@ -36,12 +40,16 @@ import
org.apache.carbondata.core.metadata.datatype.StructField;
import org.apache.carbondata.core.metadata.datatype.StructType;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.mutate.DeleteDeltaBlockDetails;
+import org.apache.carbondata.core.mutate.TupleIdEnum;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonThreadFactory;
import org.apache.carbondata.core.util.DataLoadMetrics;
import org.apache.carbondata.core.util.DataTypeUtil;
import org.apache.carbondata.core.util.ObjectSerializationUtil;
import org.apache.carbondata.core.util.ThreadLocalSessionInfo;
+import org.apache.carbondata.core.writer.CarbonDeleteDeltaWriterImpl;
import org.apache.carbondata.hadoop.internal.ObjectArrayWritable;
import org.apache.carbondata.processing.loading.ComplexDelimitersEnum;
import org.apache.carbondata.processing.loading.DataLoadExecutor;
@@ -550,4 +558,51 @@ public class CarbonTableOutputFormat extends
FileOutputFormat<NullWritable, Obje
super.close(taskAttemptContext);
}
}
+
+ public static RecordWriter<NullWritable, ObjectArrayWritable>
getDeleteDeltaRecordWriter(
+ String path) {
+ return (new RecordWriter<NullWritable, ObjectArrayWritable>() {
+ private final ArrayList<String> tupleId = new ArrayList<>();
+
+ @Override
+ public void write(NullWritable aVoid, ObjectArrayWritable objects) {
+ this.tupleId.add((String) objects.get()[0]);
+ }
+
+ @Override
+ public void close(TaskAttemptContext taskAttemptContext) throws
IOException {
+ Map<String, DeleteDeltaBlockDetails> blockToDeleteDeltaBlockMapping =
new HashMap<>();
+ DeleteDeltaBlockDetails blockDetails;
+ String blockName;
+ for (String tuple : tupleId) {
+ blockName = CarbonUpdateUtil.getBlockName(
+
(tuple.split(Pattern.quote(File.separator))[TupleIdEnum.BLOCK_ID.getTupleIdIndex()]));
+
+ if (!blockToDeleteDeltaBlockMapping.containsKey(blockName)) {
+ blockDetails = new DeleteDeltaBlockDetails(blockName);
+ blockToDeleteDeltaBlockMapping.put(blockName, blockDetails);
+ }
+ blockDetails = blockToDeleteDeltaBlockMapping.get(blockName);
+ try {
+ blockDetails.addBlocklet(
+ CarbonUpdateUtil.getRequiredFieldFromTID(tuple,
TupleIdEnum.BLOCKLET_ID),
+ CarbonUpdateUtil.getRequiredFieldFromTID(tuple,
TupleIdEnum.OFFSET), Integer
+ .parseInt(
+ CarbonUpdateUtil.getRequiredFieldFromTID(tuple,
TupleIdEnum.PAGE_ID)));
+ } catch (Exception e) {
+ LOG.error(e.getMessage());
+ throw new RuntimeException(e);
+ }
+ }
+ for (Map.Entry<String, DeleteDeltaBlockDetails> block :
blockToDeleteDeltaBlockMapping
+ .entrySet()) {
+ String deleteDeltaPath =
CarbonUpdateUtil.getDeleteDeltaFilePath(path, block.getKey(),
+ String.valueOf(System.currentTimeMillis()));
+ CarbonDeleteDeltaWriterImpl deleteDeltaWriter =
+ new CarbonDeleteDeltaWriterImpl(deleteDeltaPath);
+ deleteDeltaWriter.write(block.getValue());
+ }
+ }
+ });
+ }
}
diff --git
a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonVectorizedRecordReader.java
b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonVectorizedRecordReader.java
index 491094d..2d6626a 100644
---
a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonVectorizedRecordReader.java
+++
b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonVectorizedRecordReader.java
@@ -58,6 +58,7 @@ public class CarbonVectorizedRecordReader extends
AbstractRecordReader<Object> {
private static final Logger LOGGER =
LogServiceFactory.getLogService(CarbonVectorizedRecordReader.class.getName());
+ private static final int DEFAULT_BATCH_SIZE = 4 * 1024;
private CarbonColumnarBatch carbonColumnarBatch;
@@ -67,6 +68,8 @@ public class CarbonVectorizedRecordReader extends
AbstractRecordReader<Object> {
private int numBatched = 0;
+ private int rowId = 0;
+
private AbstractDetailQueryResultIterator iterator;
private final QueryModel queryModel;
@@ -126,6 +129,10 @@ public class CarbonVectorizedRecordReader extends
AbstractRecordReader<Object> {
if (batchIdx >= numBatched) {
if (!nextBatch()) return false;
}
+ while (rowId < carbonColumnarBatch.getRowCounter() && carbonColumnarBatch
+ .getFilteredRows()[rowId]) {
+ rowId++;
+ }
++batchIdx;
return true;
}
@@ -138,6 +145,7 @@ public class CarbonVectorizedRecordReader extends
AbstractRecordReader<Object> {
if (numBatched == 0) {
return nextBatch();
}
+ rowId = 0;
batchIdx = 0;
return true;
}
@@ -186,7 +194,7 @@ public class CarbonVectorizedRecordReader extends
AbstractRecordReader<Object> {
}
carbonColumnarBatch = new CarbonColumnarBatch(vectors,
CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT,
- new boolean[] {});
+ new boolean[DEFAULT_BATCH_SIZE]);
}
}
@@ -203,7 +211,7 @@ public class CarbonVectorizedRecordReader extends
AbstractRecordReader<Object> {
row[i] = row[projectionMapping.get(i)];
} else {
Object data =
carbonColumnarBatch.columnVectors[projectionMapping.get(i)]
- .getData(batchIdx - 1);
+ .getData(rowId);
if (carbonColumnarBatch.columnVectors[i].getType() == DataTypes.STRING
|| carbonColumnarBatch.columnVectors[i].getType() ==
DataTypes.VARCHAR) {
if (data == null) {
@@ -219,10 +227,11 @@ public class CarbonVectorizedRecordReader extends
AbstractRecordReader<Object> {
}
} else {
row[i] = carbonColumnarBatch.columnVectors[projectionMapping.get(i)]
- .getData(batchIdx - 1);
+ .getData(rowId);
}
}
}
+ rowId++;
return row;
}
diff --git
a/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonIUD.java
b/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonIUD.java
new file mode 100644
index 0000000..15d2202
--- /dev/null
+++ b/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonIUD.java
@@ -0,0 +1,394 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.sdk.file;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.Field;
+import org.apache.carbondata.core.scan.expression.ColumnExpression;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.expression.LiteralExpression;
+import
org.apache.carbondata.core.scan.expression.conditional.EqualToExpression;
+import org.apache.carbondata.core.scan.expression.logical.AndExpression;
+import org.apache.carbondata.core.scan.expression.logical.OrExpression;
+import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat;
+import org.apache.carbondata.hadoop.internal.ObjectArrayWritable;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.RecordWriter;
+
+public class CarbonIUD {
+
+ private final Configuration configuration;
+ private final Map<String, Map<String, Set<String>>>
filterColumnToValueMappingForDelete;
+ private final Map<String, Map<String, Set<String>>>
filterColumnToValueMappingForUpdate;
+ private final Map<String, Map<String, String>> updateColumnToValueMapping;
+
+ private CarbonIUD(Configuration conf) {
+ configuration = conf;
+ filterColumnToValueMappingForDelete = new HashMap<>();
+ filterColumnToValueMappingForUpdate = new HashMap<>();
+ updateColumnToValueMapping = new HashMap<>();
+ }
+
+ /**
+ * @return CarbonIUD object
+ */
+ public static CarbonIUD getInstance(Configuration conf) {
+ return new CarbonIUD(conf);
+ }
+
+ public static CarbonIUD getInstance() {
+ return new CarbonIUD(null);
+ }
+
+ /**
+ * @param path is the table path on which delete is performed
+ * @param column is the columnName on which records have to be deleted
+ * @param value of column on which the records have to be deleted
+ * @return CarbonIUD object
+ */
+ public CarbonIUD delete(String path, String column, String value) {
+ prepareDelete(path, column, value, filterColumnToValueMappingForDelete);
+ return this;
+ }
+
+ /**
+ * This method deletes the rows at given path by applying the
filterExpression
+ *
+ * @param path is the table path on which delete is performed
+ * @param filterExpression is the expression to delete the records
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public void delete(String path, Expression filterExpression)
+ throws IOException, InterruptedException {
+ CarbonReader reader = CarbonReader.builder(path)
+ .projection(new String[] {
CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID })
+ .withHadoopConf(configuration)
+ .filter(filterExpression).build();
+
+ RecordWriter<NullWritable, ObjectArrayWritable> deleteDeltaWriter =
+ CarbonTableOutputFormat.getDeleteDeltaRecordWriter(path);
+ ObjectArrayWritable writable = new ObjectArrayWritable();
+ while (reader.hasNext()) {
+ Object[] row = (Object[]) reader.readNextRow();
+ writable.set(row);
+ deleteDeltaWriter.write(NullWritable.get(), writable);
+ }
+ deleteDeltaWriter.close(null);
+ reader.close();
+ }
+
+ /**
+ * Calling this method will start the execution of delete process
+ *
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ private void closeDelete() throws IOException, InterruptedException {
+ for (Map.Entry<String, Map<String, Set<String>>> path :
this.filterColumnToValueMappingForDelete
+ .entrySet()) {
+ deleteExecution(path.getKey());
+ }
+ }
+
+ /**
+ * @param path is the table path on which update is performed
+ * @param column is the columnName on which records have to be updated
+ * @param value of column on which the records have to be updated
+ * @param updColumn is the name of updatedColumn
+ * @param updValue is the value of updatedColumn
+ * @return CarbonUID
+ */
+ public CarbonIUD update(String path, String column, String value, String
updColumn,
+ String updValue) {
+ prepareUpdate(path, column, value, updColumn, updValue);
+ return this;
+ }
+
+ /**
+ * This method updates the rows at given path by applying the
filterExpression
+ *
+ * @param path is the table path on which update is
performed.
+ * @param filterExpression is the expression object to update the
records
+ * @param updatedColumnToValueMapping contains the mapping of updatedColumns
to updatedValues
+ * @throws IOException
+ * @throws InterruptedException
+ * @throws InvalidLoadOptionException
+ */
+ public void update(String path, Expression filterExpression,
+ Map<String, String> updatedColumnToValueMapping)
+ throws IOException, InterruptedException, InvalidLoadOptionException {
+ List<String> indexFiles = getCarbonIndexFile(path);
+ Schema schema =
CarbonSchemaReader.readSchema(indexFiles.get(0)).asOriginOrder();
+ Field[] fields = schema.getFields();
+ String[] projectionColumns = new String[fields.length + 1];
+ for (int i = 0; i < fields.length; i++) {
+ projectionColumns[i] = (fields[i].getFieldName());
+ }
+ projectionColumns[projectionColumns.length - 1] =
+ CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID;
+ CarbonWriter writer =
+ CarbonWriter.builder().outputPath(path)
+ .withHadoopConf(configuration)
+ .withCsvInput(schema)
+ .writtenBy("CarbonIUD")
+ .build();
+ CarbonReader reader =
+ CarbonReader.builder(path).projection(projectionColumns)
+ .withHadoopConf(configuration)
+ .filter(filterExpression).build();
+ RecordWriter<NullWritable, ObjectArrayWritable> deleteDeltaWriter =
+ CarbonTableOutputFormat.getDeleteDeltaRecordWriter(path);
+ ObjectArrayWritable writable = new ObjectArrayWritable();
+
+ while (reader.hasNext()) {
+ Object[] row = (Object[]) reader.readNextRow();
+ writable.set(Arrays.copyOfRange(row, row.length - 1, row.length));
+ for (Map.Entry<String, String> column :
updatedColumnToValueMapping.entrySet()) {
+ row[getColumnIndex(fields, column.getKey())] = column.getValue();
+ }
+ writer.write(Arrays.copyOfRange(row, 0, row.length - 1));
+ deleteDeltaWriter.write(NullWritable.get(), writable);
+ }
+ deleteDeltaWriter.close(null);
+ writer.close();
+ reader.close();
+ }
+
+ /**
+ * Calling this method will start the execution of update process
+ *
+ * @throws IOException
+ * @throws InterruptedException
+ * @throws InvalidLoadOptionException
+ */
+ private void closeUpdate() throws IOException, InterruptedException,
InvalidLoadOptionException {
+ for (Map.Entry<String, Map<String, Set<String>>> path :
this.filterColumnToValueMappingForUpdate
+ .entrySet()) {
+ if (this.updateColumnToValueMapping.containsKey(path.getKey())) {
+ updateExecution(path.getKey());
+ }
+ }
+ }
+
+ /**
+ * Calling this method will execute delete and update operation in one
statement.
+ * This method will first perform delete operation and then update operation
+ * (update operation is only performed if the rows to be
+ * updated are not deleted while delete operation)
+ * For eg:
+ * CarbonIUD.getInstance().delete().delete().update()
+ *
+ * @throws IOException
+ * @throws InterruptedException
+ * @throws InvalidLoadOptionException
+ */
+ public void commit() throws IOException, InterruptedException,
InvalidLoadOptionException {
+ if (filterColumnToValueMappingForDelete.size() != 0) {
+ closeDelete();
+ }
+ if (filterColumnToValueMappingForUpdate.size() != 0 && !ifRowsDeleted()) {
+ closeUpdate();
+ }
+ }
+
+ private void updateExecution(String path)
+ throws IOException, InterruptedException, InvalidLoadOptionException {
+ Expression filterExpression =
+ getExpression(path,
this.filterColumnToValueMappingForUpdate.get(path));
+ update(path, filterExpression, this.updateColumnToValueMapping.get(path));
+ }
+
+ private void deleteExecution(String path) throws IOException,
InterruptedException {
+ Expression filterExpression =
+ getExpression(path,
this.filterColumnToValueMappingForDelete.get(path));
+ delete(path, filterExpression);
+ }
+
+ /**
+ * This method prepares the column to value mapping for update operation
+ * for eg: UPDATE updColumn = updValue WHERE column = value
+ */
+ private void prepareUpdate(String path, String column, String value, String
updColumn,
+ String updValue) {
+ prepareDelete(path, column, value, filterColumnToValueMappingForUpdate);
+ updColumn = updColumn.toLowerCase().trim();
+ if (this.updateColumnToValueMapping.containsKey(path)) {
+ this.updateColumnToValueMapping.get(path).put(updColumn, updValue);
+ } else {
+ Map<String, String> columnToValue = new HashMap<>();
+ columnToValue.put(updColumn, updValue);
+ this.updateColumnToValueMapping.put(path, columnToValue);
+ }
+ }
+
+ /**
+ * This method prepares the column to value mapping for delete operation
+ * for eg: DELETE WHERE column = value
+ */
+ private void prepareDelete(String path, String column, String value,
+ Map<String, Map<String, Set<String>>> filterColumnToValueMapping) {
+ column = column.toLowerCase().trim();
+ if (filterColumnToValueMapping.containsKey(path)) {
+ Map<String, Set<String>> columnToValueMapping =
filterColumnToValueMapping.get(path);
+ if (columnToValueMapping.containsKey(column)) {
+ columnToValueMapping.get(column).add(value);
+ } else {
+ Set<String> columnValues = new HashSet<>();
+ columnValues.add(value);
+ columnToValueMapping.put(column, columnValues);
+ }
+ } else {
+ Map<String, Set<String>> columnToValueMapping = new HashMap<>();
+ Set<String> columnValues = new HashSet<>();
+ columnValues.add(value);
+ columnToValueMapping.put(column, columnValues);
+ filterColumnToValueMapping.put(path, columnToValueMapping);
+ }
+ }
+
+ /**
+ * This method will convert the given columnToValue mapping into expression
object
+ * If columnToValue mapping have following entries:
+ * name --> {karan, kunal, vikram}
+ * age --> {24}
+ * the expression will look like this for above entries:
+ * ((name = karan || name = kunal || name = vikram) && (age = 24))
+ */
+ private Expression getExpression(String path, Map<String, Set<String>>
columnToValueMapping)
+ throws IOException {
+ List<String> indexFiles = getCarbonIndexFile(path);
+ Schema schema =
CarbonSchemaReader.readSchema(indexFiles.get(0)).asOriginOrder();
+ Field[] fields = schema.getFields();
+ List<Expression> listOfExpressions = new ArrayList<>();
+ for (Map.Entry<String, Set<String>> column :
columnToValueMapping.entrySet()) {
+ DataType dataType = getColumnDataType(fields, column.getKey());
+ List<Expression> listOfOrExpressions = new ArrayList<>();
+ for (String value : column.getValue()) {
+ listOfOrExpressions.add(
+ new EqualToExpression(new ColumnExpression(column.getKey(),
dataType),
+ new LiteralExpression(value, dataType)));
+ }
+ Expression OrFilterExpression = null;
+ if (listOfOrExpressions.size() > 0) {
+ OrFilterExpression = listOfOrExpressions.get(0);
+ }
+ for (int i = 1; i < listOfOrExpressions.size(); i++) {
+ OrFilterExpression = new OrExpression(OrFilterExpression,
listOfOrExpressions.get(i));
+ }
+ listOfExpressions.add(OrFilterExpression);
+ }
+ Expression filterExpression = null;
+ if (listOfExpressions.size() > 0) {
+ filterExpression = listOfExpressions.get(0);
+ }
+ for (int i = 1; i < listOfExpressions.size(); i++) {
+ filterExpression = new AndExpression(filterExpression,
listOfExpressions.get(i));
+ }
+ return filterExpression;
+ }
+
+ private int getColumnIndex(Field[] fields, String column) {
+ int index = -1;
+ for (Field field : fields) {
+ if (field.getFieldName().equals(column)) {
+ index = field.getSchemaOrdinal();
+ break;
+ }
+ }
+ if (index == -1) {
+ throw new RuntimeException("ColumnName doesn't exists");
+ }
+ return index;
+ }
+
+ private List<String> getCarbonIndexFile(String path) {
+ List<String> indexFiles = null;
+ try (Stream<Path> walk = Files.walk(Paths.get(path))) {
+ indexFiles = walk.map(x -> x.toString())
+ .filter(f -> f.endsWith(CarbonCommonConstants.UPDATE_INDEX_FILE_EXT))
+ .collect(Collectors.toList());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ if (indexFiles == null || indexFiles.size() < 1) {
+ throw new RuntimeException("Carbon index file does not exists.");
+ }
+ return indexFiles;
+ }
+
+ private DataType getColumnDataType(Field[] fields, String column) {
+ DataType type = null;
+ for (Field field : fields) {
+ if (field.getFieldName().equals(column)) {
+ type = field.getDataType();
+ break;
+ }
+ }
+ if (null == type) {
+ throw new RuntimeException("ColumnName doesn't exists");
+ }
+ if (type.isComplexType()) {
+ throw new RuntimeException("IUD operation not supported for Complex data
types");
+ }
+ return type;
+ }
+
+ private boolean ifRowsDeleted() {
+ for (Map.Entry<String, Map<String, Set<String>>> path :
this.filterColumnToValueMappingForUpdate
+ .entrySet()) {
+ if
(!this.filterColumnToValueMappingForDelete.containsKey(path.getKey())) {
+ return false;
+ } else {
+ for (Map.Entry<String, Set<String>> column :
this.filterColumnToValueMappingForDelete
+ .get(path.getKey()).entrySet()) {
+ if (!this.filterColumnToValueMappingForUpdate.get(path.getKey())
+ .containsKey(column.getKey())) {
+ return false;
+ } else {
+ for (String value :
this.filterColumnToValueMappingForUpdate.get(path.getKey())
+ .get(column.getKey())) {
+ if (!column.getValue().contains(value)) {
+ return false;
+ }
+ }
+ }
+ }
+ }
+ }
+ return true;
+ }
+}
diff --git
a/sdk/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonIUDTest.java
b/sdk/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonIUDTest.java
new file mode 100644
index 0000000..ecb4249
--- /dev/null
+++ b/sdk/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonIUDTest.java
@@ -0,0 +1,504 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.sdk.file;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.datatype.Field;
+import org.apache.carbondata.core.scan.expression.ColumnExpression;
+import org.apache.carbondata.core.scan.expression.LiteralExpression;
+import org.apache.carbondata.core.scan.expression.conditional.*;
+import org.apache.carbondata.core.scan.expression.logical.AndExpression;
+import org.apache.carbondata.core.scan.expression.logical.OrExpression;
+
+import org.apache.avro.generic.GenericData;
+import org.apache.commons.io.FileUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class CarbonIUDTest {
+
+ @Test
+ public void testDelete() throws Exception {
+ String path = "./testWriteFiles";
+ FileUtils.deleteDirectory(new File(path));
+ Field[] fields = new Field[3];
+ fields[0] = new Field("name", DataTypes.STRING);
+ fields[1] = new Field("age", DataTypes.INT);
+ fields[2] = new Field("doubleField", DataTypes.DOUBLE);
+ TestUtil.writeFilesAndVerify(10, new Schema(fields), path);
+ CarbonIUD.getInstance().delete(path, "age", "0").commit();
+ CarbonIUD.getInstance().delete(path, "age", "1").delete(path, "name",
"robot1").commit();
+ CarbonIUD.getInstance().delete(path, "age", "2").delete(path, "name",
"robot2")
+ .delete(path, "doubleField", "1.0").commit();
+ CarbonIUD.getInstance().delete(path, "name", "robot3").delete(path,
"name", "robot4")
+ .delete(path, "name", "robot5").commit();
+ CarbonIUD.getInstance().delete(path, "name", "robot6").delete(path,
"name", "robot7")
+ .delete(path, "name", "robot8").delete(path, "age", "6").delete(path,
"age", "7").commit();
+
+ CarbonReader reader =
+ CarbonReader.builder(path).projection(new String[] { "name", "age",
"doubleField" })
+ .build();
+
+ int i = 0;
+ while (reader.hasNext()) {
+ Object[] row = (Object[]) reader.readNextRow();
+ assert (((String) row[0]).contains("robot8") || ((String)
row[0]).contains("robot9"));
+ assert (((int) (row[1])) > 7);
+ assert ((double) row[2] > 3.5);
+ i++;
+ }
+ Assert.assertEquals(i, 2);
+ reader.close();
+ FileUtils.deleteDirectory(new File(path));
+ }
+
+ @Test
+ public void testDeleteWithConditionalExpressions() throws Exception {
+ String path = "./testWriteFiles";
+ FileUtils.deleteDirectory(new File(path));
+ Field[] fields = new Field[3];
+ fields[0] = new Field("name", DataTypes.STRING);
+ fields[1] = new Field("age", DataTypes.INT);
+ fields[2] = new Field("doubleField", DataTypes.DOUBLE);
+ TestUtil.writeFilesAndVerify(20, new Schema(fields), path);
+ ColumnExpression columnExpression1 = new ColumnExpression("age",
DataTypes.INT);
+ LessThanExpression lessThanExpression =
+ new LessThanExpression(columnExpression1, new LiteralExpression("3",
DataTypes.INT));
+ CarbonIUD.getInstance().delete(path, lessThanExpression);
+
+ ColumnExpression columnExpression2 = new ColumnExpression("age",
DataTypes.INT);
+ GreaterThanExpression greaterThanExpression =
+ new GreaterThanExpression(columnExpression2, new
LiteralExpression("16", DataTypes.INT));
+ CarbonIUD.getInstance().delete(path, greaterThanExpression);
+
+ ColumnExpression columnExpression3 = new ColumnExpression("age",
DataTypes.INT);
+ LessThanEqualToExpression lessThanEqualToExpression =
+ new LessThanEqualToExpression(columnExpression3, new
LiteralExpression("5", DataTypes.INT));
+ CarbonIUD.getInstance().delete(path, lessThanEqualToExpression);
+
+ ColumnExpression columnExpression4 = new ColumnExpression("age",
DataTypes.INT);
+ GreaterThanEqualToExpression greaterThanEqualToExpression =
+ new GreaterThanEqualToExpression(columnExpression4,
+ new LiteralExpression("15", DataTypes.INT));
+ CarbonIUD.getInstance().delete(path, greaterThanEqualToExpression);
+
+ CarbonReader reader =
+ CarbonReader.builder(path).projection(new String[] { "name", "age",
"doubleField" })
+ .build();
+
+ int i = 0;
+ while (reader.hasNext()) {
+ Object[] row = (Object[]) reader.readNextRow();
+ assert (!((int) row[1] <= 5 || (int) row[1] >= 15));
+ i++;
+ }
+ Assert.assertEquals(i, 9);
+ reader.close();
+ FileUtils.deleteDirectory(new File(path));
+ }
+
+ @Test
+ public void testDeleteWithAndFilter() throws Exception {
+ String path = "./testWriteFiles";
+ FileUtils.deleteDirectory(new File(path));
+ Field[] fields = new Field[3];
+ fields[0] = new Field("name", DataTypes.STRING);
+ fields[1] = new Field("age", DataTypes.INT);
+ fields[2] = new Field("doubleField", DataTypes.DOUBLE);
+ TestUtil.writeFilesAndVerify(20, new Schema(fields), path);
+
+ ColumnExpression columnExpression = new ColumnExpression("doubleField",
DataTypes.DOUBLE);
+ LessThanExpression lessThanExpression1 =
+ new LessThanExpression(columnExpression, new LiteralExpression("3.5",
DataTypes.DOUBLE));
+
+ ColumnExpression columnExpression2 = new ColumnExpression("age",
DataTypes.INT);
+ LessThanExpression lessThanExpression2 =
+ new LessThanExpression(columnExpression2, new LiteralExpression("4",
DataTypes.INT));
+
+ AndExpression andExpression = new AndExpression(lessThanExpression1,
lessThanExpression2);
+ CarbonIUD.getInstance().delete(path, andExpression);
+
+ CarbonReader reader =
+ CarbonReader.builder(path).projection(new String[] { "name", "age",
"doubleField" })
+ .build();
+
+ int i = 0;
+ while (reader.hasNext()) {
+ Object[] row = (Object[]) reader.readNextRow();
+ assert (!((int) row[1] < 4));
+ i++;
+ }
+ Assert.assertEquals(i, 16);
+ reader.close();
+ FileUtils.deleteDirectory(new File(path));
+ }
+
+ @Test
+ public void testDeleteWithORFilter() throws Exception {
+ String path = "./testWriteFiles";
+ FileUtils.deleteDirectory(new File(path));
+ Field[] fields = new Field[3];
+ fields[0] = new Field("name", DataTypes.STRING);
+ fields[1] = new Field("age", DataTypes.INT);
+ fields[2] = new Field("doubleField", DataTypes.DOUBLE);
+ TestUtil.writeFilesAndVerify(20, new Schema(fields), path);
+
+ ColumnExpression columnExpression = new ColumnExpression("doubleField",
DataTypes.DOUBLE);
+ LessThanExpression lessThanExpression =
+ new LessThanExpression(columnExpression, new LiteralExpression("3.5",
DataTypes.DOUBLE));
+
+ ColumnExpression columnExpression2 = new ColumnExpression("age",
DataTypes.INT);
+ GreaterThanExpression greaterThanExpression =
+ new GreaterThanExpression(columnExpression2, new
LiteralExpression("11", DataTypes.INT));
+
+ OrExpression orExpression = new OrExpression(lessThanExpression,
greaterThanExpression);
+ CarbonIUD.getInstance().delete(path, orExpression);
+
+ CarbonReader reader =
+ CarbonReader.builder(path).projection(new String[] { "name", "age",
"doubleField" })
+ .build();
+
+ int i = 0;
+ while (reader.hasNext()) {
+ Object[] row = (Object[]) reader.readNextRow();
+ assert (!((int) row[1] > 11 || (double) row[2] < 3.5));
+ i++;
+ }
+ Assert.assertEquals(i, 5);
+ reader.close();
+ FileUtils.deleteDirectory(new File(path));
+ }
+
+ @Test
+ public void testDeleteAtMultiplePaths() throws Exception {
+ String path1 = "./testWriteFiles1";
+ String path2 = "./testWriteFiles2";
+ FileUtils.deleteDirectory(new File(path1));
+ FileUtils.deleteDirectory(new File(path2));
+ Field[] fields = new Field[3];
+ fields[0] = new Field("name", DataTypes.STRING);
+ fields[1] = new Field("age", DataTypes.INT);
+ fields[2] = new Field("doubleField", DataTypes.DOUBLE);
+ TestUtil.writeFilesAndVerify(10, new Schema(fields), path1);
+ TestUtil.writeFilesAndVerify(10, new Schema(fields), path2);
+ CarbonIUD.getInstance().delete(path1, "age", "2").delete(path2, "age", "3")
+ .delete(path1, "name", "robot2").delete(path2, "name",
"robot3").commit();
+
+ CarbonReader reader1 =
+ CarbonReader.builder(path1).projection(new String[] { "name", "age",
"doubleField" })
+ .build();
+
+ int i = 0;
+ while (reader1.hasNext()) {
+ Object[] row = (Object[]) reader1.readNextRow();
+ assert (!(((String) row[0]).contains("robot2")));
+ assert (((int) (row[1])) != 2);
+ assert ((double) row[2] != 1.0);
+ i++;
+ }
+ Assert.assertEquals(i, 9);
+ reader1.close();
+ FileUtils.deleteDirectory(new File(path1));
+
+ CarbonReader reader2 =
+ CarbonReader.builder(path2).projection(new String[] { "name", "age",
"doubleField" })
+ .build();
+
+ i = 0;
+ while (reader2.hasNext()) {
+ Object[] row = (Object[]) reader2.readNextRow();
+ assert (!(((String) row[0]).contains("robot3")));
+ assert (((int) (row[1])) != 3);
+ assert ((double) row[2] != 1.5);
+ i++;
+ }
+ Assert.assertEquals(i, 9);
+ reader2.close();
+ FileUtils.deleteDirectory(new File(path2));
+ }
+
+ @Test
+ public void testDeleteInMultipleSegments() throws Exception {
+ String path = "./testWriteFiles";
+ FileUtils.deleteDirectory(new File(path));
+ Field[] fields = new Field[3];
+ fields[0] = new Field("name", DataTypes.STRING);
+ fields[1] = new Field("age", DataTypes.INT);
+ fields[2] = new Field("doubleField", DataTypes.DOUBLE);
+ TestUtil.writeFilesAndVerify(10, new Schema(fields), path);
+ TestUtil.writeFilesAndVerify(10, new Schema(fields), path);
+ TestUtil.writeFilesAndVerify(10, new Schema(fields), path);
+ CarbonIUD.getInstance().delete(path, "age", "2").delete(path, "name",
"robot2").commit();
+
+ CarbonReader reader =
+ CarbonReader.builder(path).projection(new String[] { "name", "age",
"doubleField" })
+ .build();
+
+ int i = 0;
+ while (reader.hasNext()) {
+ Object[] row = (Object[]) reader.readNextRow();
+ assert (!(((String) row[0]).contains("robot2")));
+ assert (((int) (row[1])) != 2);
+ assert ((double) row[2] != 1.0);
+ i++;
+ }
+ Assert.assertEquals(i, 27);
+ reader.close();
+ FileUtils.deleteDirectory(new File(path));
+ }
+
+ @Test
+ public void testUpdate() throws Exception {
+ String path = "./testWriteFiles";
+ FileUtils.deleteDirectory(new File(path));
+ Field[] fields = new Field[3];
+ fields[0] = new Field("name", DataTypes.STRING);
+ fields[1] = new Field("age", DataTypes.INT);
+ fields[2] = new Field("doubleField", DataTypes.DOUBLE);
+ TestUtil.writeFilesAndVerify(20, new Schema(fields), path);
+ CarbonIUD.getInstance().update(path, "age", "2", "age", "3").commit();
+ CarbonIUD.getInstance().update(path, "name", "robot2", "age", "3")
+ .update(path, "age", "12", "name", "robot13").commit();
+
+ ColumnExpression columnExpression1 = new ColumnExpression("age",
DataTypes.INT);
+ EqualToExpression equalToExpression1 =
+ new EqualToExpression(columnExpression1, new LiteralExpression("3",
DataTypes.INT));
+
+ CarbonReader reader1 =
+ CarbonReader.builder(path).projection(new String[] { "name", "age",
"doubleField" })
+ .filter(equalToExpression1).build();
+
+ int i = 0;
+ while (reader1.hasNext()) {
+ Object[] row = (Object[]) reader1.readNextRow();
+ assert (((int) (row[1])) == 3);
+ i++;
+ }
+ Assert.assertEquals(i, 3);
+ reader1.close();
+
+ CarbonReader reader2 =
+ CarbonReader.builder(path).projection(new String[] { "name", "age",
"doubleField" })
+ .build();
+
+ i = 0;
+ while (reader2.hasNext()) {
+ Object[] row = (Object[]) reader2.readNextRow();
+ assert (((int) (row[1])) != 2);
+ i++;
+ }
+ Assert.assertEquals(i, 20);
+ reader2.close();
+ FileUtils.deleteDirectory(new File(path));
+ }
+
+ @Test
+ public void testDeleteOnUpdatedRows() throws Exception {
+ String path = "./testWriteFiles";
+ FileUtils.deleteDirectory(new File(path));
+ Field[] fields = new Field[3];
+ fields[0] = new Field("name", DataTypes.STRING);
+ fields[1] = new Field("age", DataTypes.INT);
+ fields[2] = new Field("doubleField", DataTypes.DOUBLE);
+ TestUtil.writeFilesAndVerify(20, new Schema(fields), path);
+ Map<String, String> updateColumnToValue = new HashMap<>();
+ updateColumnToValue.put("name", "robot");
+ updateColumnToValue.put("age", "24");
+
+ ColumnExpression columnExpression = new ColumnExpression("age",
DataTypes.INT);
+ LessThanExpression lessThanExpression =
+ new LessThanExpression(columnExpression, new LiteralExpression("10",
DataTypes.INT));
+ CarbonIUD.getInstance().update(path, lessThanExpression,
updateColumnToValue);
+ CarbonIUD.getInstance().delete(path, "doubleField", "2.0").commit();
+
+ ColumnExpression columnExpression1 = new ColumnExpression("age",
DataTypes.INT);
+ EqualToExpression equalToExpression =
+ new EqualToExpression(columnExpression1, new LiteralExpression("24",
DataTypes.INT));
+
+ CarbonReader reader =
+ CarbonReader.builder(path).projection(new String[] { "name", "age",
"doubleField" })
+ .filter(equalToExpression).build();
+
+ int i = 0;
+ while (reader.hasNext()) {
+ Object[] row = (Object[]) reader.readNextRow();
+ assert (((String) row[0]).contains("robot"));
+ assert (((int) (row[1])) == 24);
+ i++;
+ }
+ Assert.assertEquals(i, 9);
+ reader.close();
+ FileUtils.deleteDirectory(new File(path));
+ }
+
+ @Test
+ public void testDeleteAndUpdateTogether() throws Exception {
+ String path = "./testWriteFiles";
+ FileUtils.deleteDirectory(new File(path));
+ Field[] fields = new Field[3];
+ fields[0] = new Field("name", DataTypes.STRING);
+ fields[1] = new Field("age", DataTypes.INT);
+ fields[2] = new Field("doubleField", DataTypes.DOUBLE);
+ TestUtil.writeFilesAndVerify(20, new Schema(fields), path);
+ CarbonIUD.getInstance().delete(path, "name", "robot0").delete(path,
"name", "robot1")
+ .delete(path, "name", "robot2").delete(path, "age", "0").delete(path,
"age", "1")
+ .update(path, "name", "robot1", "name", "Karan").update(path, "age",
"1", "age", "0")
+ .commit();
+
+ CarbonReader reader =
+ CarbonReader.builder(path).projection(new String[] { "name", "age",
"doubleField" })
+ .build();
+
+ int i = 0;
+ while (reader.hasNext()) {
+ Object[] row = (Object[]) reader.readNextRow();
+ assert (!((String) row[0]).contains("Karan"));
+ assert (((int) (row[1])) > 1);
+ i++;
+ }
+ Assert.assertEquals(i, 18);
+ reader.close();
+ FileUtils.deleteDirectory(new File(path));
+ }
+
+ @Test
+ public void testIUDOnDifferentDataType() throws Exception {
+ String path = "./testWriteFiles";
+ FileUtils.deleteDirectory(new File(path));
+ Field[] fields = new Field[11];
+ fields[0] = new Field("stringField", DataTypes.STRING);
+ fields[1] = new Field("shortField", DataTypes.SHORT);
+ fields[2] = new Field("intField", DataTypes.INT);
+ fields[3] = new Field("longField", DataTypes.LONG);
+ fields[4] = new Field("doubleField", DataTypes.DOUBLE);
+ fields[5] = new Field("boolField", DataTypes.BOOLEAN);
+ fields[6] = new Field("dateField", DataTypes.DATE);
+ fields[7] = new Field("timeField", DataTypes.TIMESTAMP);
+ fields[8] = new Field("decimalField", DataTypes.createDecimalType(8, 2));
+ fields[9] = new Field("varcharField", DataTypes.VARCHAR);
+ fields[10] = new Field("arrayField",
DataTypes.createArrayType(DataTypes.STRING));
+ CarbonWriter writer =
+
CarbonWriter.builder().outputPath(path).withLoadOption("complex_delimiter_level_1",
"#")
+ .withCsvInput(new Schema(fields)).writtenBy("IUDTest").build();
+ for (int i = 0; i < 10; i++) {
+ boolean boolValue = true;
+ String decimalValue = "12.345";
+ String dateValue = "2019-03-02";
+ if (i == 6) {
+ boolValue = false;
+ }
+ if (i == 7) {
+ decimalValue = "12.366";
+ }
+ if (i == 8) {
+ dateValue = "2019-03-03";
+ }
+ String[] row2 =
+ new String[] { "robot" + (i % 10), String.valueOf(i % 10000),
String.valueOf(i),
+ String.valueOf(Long.MAX_VALUE - i), String.valueOf((double) i /
2),
+ String.valueOf(boolValue), dateValue, "2019-02-12 03:03:34",
decimalValue,
+ "varchar" + (i % 10), "Hello#World#From#Carbon" };
+ writer.write(row2);
+ }
+ writer.close();
+ CarbonIUD.getInstance().delete(path, "stringField", "robot0").commit();
+ CarbonIUD.getInstance().delete(path, "shortField", "1").commit();
+ CarbonIUD.getInstance().delete(path, "intField", "2").commit();
+ CarbonIUD.getInstance().delete(path, "longField",
"9223372036854775804").commit();
+ CarbonIUD.getInstance().delete(path, "doubleField", "2.0").commit();
+ CarbonIUD.getInstance().delete(path, "varcharField", "varchar5").commit();
+ CarbonIUD.getInstance().delete(path, "boolField", "false").commit();
+ CarbonIUD.getInstance().delete(path, "decimalField", "12.37").commit();
+ CarbonIUD.getInstance().delete(path, "dateField", "2019-03-03").commit();
+ CarbonIUD.getInstance().delete(path, "timeField", "2019-02-12
03:03:34").commit();
+
+ File[] indexFiles = new File(path).listFiles(new FilenameFilter() {
+ @Override
+ public boolean accept(File dir, String name) {
+ if (name == null) {
+ return false;
+ }
+ return name.endsWith("carbonindex");
+ }
+ });
+ if (indexFiles == null || indexFiles.length < 1) {
+ throw new RuntimeException("Carbon index file not exists.");
+ }
+ Schema schema =
CarbonSchemaReader.readSchema(indexFiles[0].getAbsolutePath()).asOriginOrder();
+ String[] strings = new String[schema.getFields().length];
+ for (int i = 0; i < schema.getFields().length; i++) {
+ strings[i] = (schema.getFields())[i].getFieldName();
+ }
+ CarbonReader reader =
CarbonReader.builder(path).projection(strings).build();
+ int i = 0;
+ while (reader.hasNext()) {
+ i++;
+ }
+ Assert.assertEquals(i, 0);
+ reader.close();
+ try {
+ CarbonIUD.getInstance().delete(path, "arrayfield",
"{Hello,World,From,Carbon}").commit();
+ } catch (RuntimeException e) {
+ Assert.assertTrue(
+ e.getMessage().contains("IUD operation not supported for Complex
data types"));
+ }
+ FileUtils.deleteDirectory(new File(path));
+ }
+
+ @Test
+ public void testIUDOnComplexType() throws Exception {
+ String path = "./testWriteFiles";
+ FileUtils.deleteDirectory(new File(path));
+ Field[] fields = new Field[3];
+ fields[0] = new Field("name", DataTypes.STRING);
+ fields[1] = new Field("age", DataTypes.INT);
+ fields[2] = new Field("mapRecord",
DataTypes.createMapType(DataTypes.STRING, DataTypes.STRING));
+ String mySchema =
+ "{ " + " \"name\": \"address\", " + " \"type\": \"record\", " + "
\"fields\": [ "
+ + " { " + " \"name\": \"name\", " + " \"type\":
\"string\" " + " }, "
+ + " { " + " \"name\": \"age\", " + " \"type\":
\"int\" " + " }, "
+ + " { " + " \"name\": \"mapRecord\", " + " \"type\":
{ "
+ + " \"type\": \"map\", " + " \"values\": \"string\"
" + " } "
+ + " } " + " ] " + "} ";
+
+ String json =
+ "{\"name\":\"bob\", \"age\":10, \"mapRecord\": {\"street\":
\"k-lane\", \"city\": \"bangalore\"}}";
+ org.apache.avro.Schema nn = new
org.apache.avro.Schema.Parser().parse(mySchema);
+ GenericData.Record record = TestUtil.jsonToAvro(json, mySchema);
+ CarbonWriter writer =
+
CarbonWriter.builder().outputPath(path).withAvroInput(nn).writtenBy("CarbonIUDTest")
+ .build();
+ for (int i = 0; i < 10; i++) {
+ writer.write(record);
+ }
+ writer.close();
+ try {
+ CarbonIUD.getInstance()
+ .delete(path, "mapRecord", "{\"street\": \"k-lane\", \"city\":
\"bangalore\"}").commit();
+ } catch (RuntimeException e) {
+ Assert.assertTrue(
+ e.getMessage().contains("IUD operation not supported for Complex
data types"));
+ }
+ FileUtils.deleteDirectory(new File(path));
+ }
+}