This is an automated email from the ASF dual-hosted git repository.

jackylk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 427eddf  [CARBONDATA-3461] Carbon SDK support filter equal values set
427eddf is described below

commit 427eddf96b46031052d5038643fb76576ad7725a
Author: xubo245 <601450...@qq.com>
AuthorDate: Mon Mar 23 00:01:14 2020 +0800

    [CARBONDATA-3461] Carbon SDK support filter equal values set
    
    Add support in Carbon SDK for filter equal values set.
    
    1.prepareEqualToExpression(String columnName, String dataType, Object value)
    2.prepareOrExpression(List expressions).
    3.prepareEqualToExpressionSet(String columnName, DataType dataType, List 
values)
    4.prepareEqualToExpressionSet(String columnName, String dataType, List 
values).
    
    This closes #3317
---
 .../carbondata/core/scan/filter/FilterUtil.java    | 107 +++++++++++-
 .../carbondata/sdk/file/AvroCarbonWriter.java      |  47 ++++-
 .../carbondata/sdk/file/CarbonWriterBuilder.java   |  21 ++-
 .../carbondata/sdk/file/AvroCarbonWriterTest.java  | 119 +++++++++++++
 .../carbondata/sdk/file/CarbonReaderTest.java      | 193 +++++++++++++++++++--
 .../org/apache/carbondata/sdk/file/ImageTest.java  |  65 ++++++-
 6 files changed, 527 insertions(+), 25 deletions(-)

diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
index 50580ab..748bafd 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
@@ -49,13 +49,11 @@ import 
org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.core.scan.expression.ColumnExpression;
 import org.apache.carbondata.core.scan.expression.Expression;
 import org.apache.carbondata.core.scan.expression.LiteralExpression;
-import 
org.apache.carbondata.core.scan.expression.conditional.ConditionalExpression;
-import 
org.apache.carbondata.core.scan.expression.conditional.ImplicitExpression;
-import org.apache.carbondata.core.scan.expression.conditional.InExpression;
-import org.apache.carbondata.core.scan.expression.conditional.ListExpression;
+import org.apache.carbondata.core.scan.expression.conditional.*;
 import 
org.apache.carbondata.core.scan.expression.exception.FilterIllegalMemberException;
 import 
org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
 import org.apache.carbondata.core.scan.expression.logical.AndExpression;
+import org.apache.carbondata.core.scan.expression.logical.OrExpression;
 import org.apache.carbondata.core.scan.expression.logical.TrueExpression;
 import org.apache.carbondata.core.scan.filter.executer.AndFilterExecuterImpl;
 import 
org.apache.carbondata.core.scan.filter.executer.DimColumnExecuterFilterInfo;
@@ -1291,4 +1289,105 @@ public final class FilterUtil {
     return ByteUtil.convertIntToBytes(key);
   }
 
+  public static Expression prepareEqualToExpression(String columnName, String 
dataType,
+      Object value) {
+    if (DataTypes.STRING.getName().equalsIgnoreCase(dataType)) {
+      return new EqualToExpression(
+          new ColumnExpression(columnName, DataTypes.STRING),
+          new LiteralExpression(value, DataTypes.STRING));
+    } else if (DataTypes.INT.getName().equalsIgnoreCase(dataType)) {
+      return new EqualToExpression(
+          new ColumnExpression(columnName, DataTypes.INT),
+          new LiteralExpression(value, DataTypes.INT));
+    } else if (DataTypes.DOUBLE.getName().equalsIgnoreCase(dataType)) {
+      return new EqualToExpression(
+          new ColumnExpression(columnName, DataTypes.DOUBLE),
+          new LiteralExpression(value, DataTypes.DOUBLE));
+    } else if (DataTypes.FLOAT.getName().equalsIgnoreCase(dataType)) {
+      return new EqualToExpression(
+          new ColumnExpression(columnName, DataTypes.FLOAT),
+          new LiteralExpression(value, DataTypes.FLOAT));
+    } else if (DataTypes.SHORT.getName().equalsIgnoreCase(dataType)) {
+      return new EqualToExpression(
+          new ColumnExpression(columnName, DataTypes.SHORT),
+          new LiteralExpression(value, DataTypes.SHORT));
+    } else if (DataTypes.BINARY.getName().equalsIgnoreCase(dataType)) {
+      return new EqualToExpression(
+          new ColumnExpression(columnName, DataTypes.BINARY),
+          new LiteralExpression(value, DataTypes.BINARY));
+    } else if (DataTypes.DATE.getName().equalsIgnoreCase(dataType)) {
+      return new EqualToExpression(
+          new ColumnExpression(columnName, DataTypes.DATE),
+          new LiteralExpression(value, DataTypes.DATE));
+    } else if (DataTypes.LONG.getName().equalsIgnoreCase(dataType)) {
+      return new EqualToExpression(
+          new ColumnExpression(columnName, DataTypes.LONG),
+          new LiteralExpression(value, DataTypes.LONG));
+    } else if (DataTypes.TIMESTAMP.getName().equalsIgnoreCase(dataType)) {
+      return new EqualToExpression(
+          new ColumnExpression(columnName, DataTypes.TIMESTAMP),
+          new LiteralExpression(value, DataTypes.TIMESTAMP));
+    } else if (DataTypes.BYTE.getName().equalsIgnoreCase(dataType)) {
+      return new EqualToExpression(
+          new ColumnExpression(columnName, DataTypes.BYTE),
+          new LiteralExpression(value, DataTypes.BYTE));
+    } else {
+      throw new IllegalArgumentException("Unsupported data type: " + dataType);
+    }
+  }
+
+  public static Expression prepareOrExpression(List<Expression> expressions) {
+    if (expressions.size() < 2) {
+      throw new RuntimeException("Please input at least two expressions");
+    }
+    Expression expression = expressions.get(0);
+    for (int i = 1; i < expressions.size(); i++) {
+      expression = new OrExpression(expression, expressions.get(i));
+    }
+    return expression;
+  }
+
+  private static Expression prepareEqualToExpressionSet(String columnName, 
DataType dataType,
+      List<Object> values) {
+    Expression expression = null;
+    if (0 == values.size()) {
+      expression = prepareEqualToExpression(columnName, dataType.getName(), 
null);
+    } else {
+      expression = prepareEqualToExpression(columnName, dataType.getName(), 
values.get(0));
+    }
+    for (int i = 1; i < values.size(); i++) {
+      Expression expression2 = prepareEqualToExpression(columnName,
+          dataType.getName(), values.get(i));
+      expression = new OrExpression(expression, expression2);
+    }
+    return expression;
+  }
+
+  public static Expression prepareEqualToExpressionSet(String columnName, 
String dataType,
+      List<Object> values) {
+    if (DataTypes.STRING.getName().equalsIgnoreCase(dataType)) {
+      return prepareEqualToExpressionSet(columnName, DataTypes.STRING, values);
+    } else if (DataTypes.INT.getName().equalsIgnoreCase(dataType)) {
+      return prepareEqualToExpressionSet(columnName, DataTypes.INT, values);
+    } else if (DataTypes.DOUBLE.getName().equalsIgnoreCase(dataType)) {
+      return prepareEqualToExpressionSet(columnName, DataTypes.DOUBLE, values);
+    } else if (DataTypes.FLOAT.getName().equalsIgnoreCase(dataType)) {
+      return prepareEqualToExpressionSet(columnName, DataTypes.FLOAT, values);
+    } else if (DataTypes.SHORT.getName().equalsIgnoreCase(dataType)) {
+      return prepareEqualToExpressionSet(columnName, DataTypes.SHORT, values);
+    } else if (DataTypes.BINARY.getName().equalsIgnoreCase(dataType)) {
+      return prepareEqualToExpressionSet(columnName, DataTypes.BINARY, values);
+    } else if (DataTypes.DATE.getName().equalsIgnoreCase(dataType)) {
+      return prepareEqualToExpressionSet(columnName, DataTypes.DATE, values);
+    } else if (DataTypes.LONG.getName().equalsIgnoreCase(dataType)) {
+      return prepareEqualToExpressionSet(columnName, DataTypes.LONG, values);
+    } else if (DataTypes.TIMESTAMP.getName().equalsIgnoreCase(dataType)) {
+      return prepareEqualToExpressionSet(columnName, DataTypes.TIMESTAMP, 
values);
+    } else if (DataTypes.BYTE.getName().equalsIgnoreCase(dataType)) {
+      return prepareEqualToExpressionSet(columnName, DataTypes.BYTE, values);
+    } else {
+      throw new IllegalArgumentException("Unsupported data type: " + dataType);
+    }
+  }
+
 }
diff --git 
a/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java 
b/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
index ba85c06..93271d2 100644
--- a/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
+++ b/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
@@ -17,7 +17,11 @@
 
 package org.apache.carbondata.sdk.file;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.nio.ByteBuffer;
@@ -31,6 +35,7 @@ import java.util.UUID;
 
 import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import 
org.apache.carbondata.core.keygenerator.directdictionary.timestamp.DateDirectDictionaryGenerator;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
@@ -48,7 +53,12 @@ import 
org.apache.carbondata.processing.loading.model.CarbonLoadModel;
 import org.apache.avro.LogicalType;
 import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.JsonDecoder;
 import org.apache.avro.util.Utf8;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.NullWritable;
@@ -86,6 +96,12 @@ import org.apache.log4j.Logger;
     this.writable = new ObjectArrayWritable();
   }
 
+  AvroCarbonWriter(CarbonLoadModel loadModel, Configuration hadoopConf, Schema 
avroSchema)
+    throws IOException {
+    this(loadModel, hadoopConf);
+    this.avroSchema = avroSchema;
+  }
+
   private Object[] avroToCsv(GenericData.Record avroRecord) {
     if (avroSchema == null) {
       avroSchema = avroRecord.getSchema();
@@ -766,7 +782,36 @@ import org.apache.log4j.Logger;
   @Override
   public void write(Object object) throws IOException {
     try {
-      GenericData.Record record = (GenericData.Record) object;
+      GenericData.Record record = null;
+      if (object instanceof GenericData.Record) {
+        record = (GenericData.Record) object;
+      } else if (object instanceof String) {
+        String json = (String) object;
+        InputStream input = null;
+        DataFileWriter writer = null;
+        ByteArrayOutputStream output = null;
+        try {
+          GenericDatumReader reader = new GenericDatumReader(this.avroSchema);
+          input = new 
ByteArrayInputStream(json.getBytes(CarbonCommonConstants.DEFAULT_CHARSET));
+          output = new ByteArrayOutputStream();
+          DataInputStream din = new DataInputStream(input);
+          writer = new DataFileWriter(new GenericDatumWriter());
+          writer.create(this.avroSchema, output);
+          JsonDecoder decoder = 
DecoderFactory.get().jsonDecoder(this.avroSchema, din);
+          record = (GenericData.Record) reader.read(null, decoder);
+        } finally {
+          if (input != null) {
+            input.close();
+          }
+          if (writer != null) {
+            writer.close();
+          }
+        }
+      } else {
+        throw new UnsupportedOperationException(
+          "carbon not support " + object + ", only support GenericData.Record 
" + "and String for "
+            + this.getClass().getName());
+      }
 
       // convert Avro record to CSV String[]
       Object[] csvRecord = avroToCsv(record);
diff --git 
a/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java 
b/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
index 31fcdd1..87bd705 100644
--- 
a/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
+++ 
b/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
@@ -63,6 +63,7 @@ import org.apache.hadoop.conf.Configuration;
 @InterfaceStability.Unstable
 public class CarbonWriterBuilder {
   private Schema schema;
+  private org.apache.avro.Schema  avroSchema;
   private String path;
   //initialize with empty array , as no columns should be selected for sorting 
in NO_SORT
   private String[] sortColumns = new String[0];
@@ -557,9 +558,21 @@ public class CarbonWriterBuilder {
    */
   public CarbonWriterBuilder withAvroInput(org.apache.avro.Schema avroSchema) {
     Objects.requireNonNull(avroSchema, "Avro schema should not be null");
-    if (this.schema != null) {
-      throw new IllegalArgumentException("schema should be set only once");
-    }
+    this.avroSchema = avroSchema;
+    this.schema = AvroCarbonWriter.getCarbonSchemaFromAvroSchema(avroSchema);
+    this.writerType = WRITER_TYPE.AVRO;
+    return this;
+  }
+
+  /**
+   * to build a {@link CarbonWriter}, which accepts Avro object
+   *
+   * @param schema avro Schema object {org.apache.avro.Schema}
+   * @return CarbonWriterBuilder
+   */
+  public CarbonWriterBuilder withAvroInput(String schema) {
+    this.avroSchema = new org.apache.avro.Schema.Parser().parse(schema);
+    Objects.requireNonNull(avroSchema, "Avro schema should not be null");
     this.schema = AvroCarbonWriter.getCarbonSchemaFromAvroSchema(avroSchema);
     this.writerType = WRITER_TYPE.AVRO;
     return this;
@@ -647,7 +660,7 @@ public class CarbonWriterBuilder {
       // removed from the load. LoadWithoutConverter flag is going to point to 
the Loader Builder
       // which will skip Conversion Step.
       loadModel.setLoadWithoutConverterStep(true);
-      return new AvroCarbonWriter(loadModel, hadoopConf);
+      return new AvroCarbonWriter(loadModel, hadoopConf, this.avroSchema);
     } else if (this.writerType == WRITER_TYPE.JSON) {
       loadModel.setJsonFileLoad(true);
       return new JsonCarbonWriter(loadModel, hadoopConf);
diff --git 
a/sdk/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java
 
b/sdk/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java
index 2f1e336..6cbce82 100644
--- 
a/sdk/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java
+++ 
b/sdk/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java
@@ -337,6 +337,71 @@ public class AvroCarbonWriterTest {
     }
   }
 
+  private void WriteAvroComplexDataAndRead(String mySchema)
+      throws IOException, InvalidLoadOptionException, InterruptedException {
+
+    // conversion to GenericData.Record
+    Schema nn = new Schema.Parser().parse(mySchema);
+    try {
+      CarbonWriter writer =
+          CarbonWriter.builder()
+              .outputPath(path)
+              .withAvroInput(mySchema)
+              .writtenBy("AvroCarbonWriterTest")
+              .build();
+      int numOfRows = 100000/100;
+      int numOfWrite = 20000;
+      int arrayLength = 300;
+      for (int i = 0; i < numOfRows; i++) {
+        StringBuffer aa1 = new StringBuffer();
+        StringBuffer bb1 = new StringBuffer();
+        StringBuffer cc1 = new StringBuffer();
+        aa1.append("[0.1234567,0.2,-0.3,0.4]");
+        bb1.append("[0.2123456]");
+        cc1.append("[0.3123456]");
+        for (int j = 1; j < arrayLength; j++) {
+          aa1.append(",[1" + i + "" + j + ".1234567,1" + i + "" + j + ".2,-1" 
+ i + "" + j + ".3,1" + i + "" + j + ".4]");
+          bb1.append(",[2" + i + "" + j + ".2123456,-2" + i + "" + j + ".2]");
+          cc1.append(",[3" + i + "" + j + ".3123456]");
+        }
+        String json = "{\"fileName\":\"bob\", \"id\":10, "
+            + "   \"aa1\" : [" + aa1 + "], "
+            + "\"bb1\" : [" + bb1 + "], " +
+            "\"cc1\" : [" + cc1 + "]}";
+
+        writer.write(json);
+        if (i > 0 && i % numOfWrite == 0) {
+          writer.close();
+          writer =
+              CarbonWriter.builder()
+                  .outputPath(path)
+                  .withAvroInput(mySchema)
+                  .writtenBy("AvroCarbonWriterTest")
+                  .build();
+        }
+      }
+      writer.close();
+      String[] projection = new String[nn.getFields().size()];
+      for (int j = 0; j < nn.getFields().size(); j++) {
+        projection[j] = nn.getFields().get(j).name();
+      }
+      CarbonReader carbonReader = 
CarbonReader.builder().projection(projection).withFolder(path).build();
+      int sum = 0;
+      while (carbonReader.hasNext()) {
+        sum++;
+        Object[] row = (Object[]) carbonReader.readNextRow();
+        Assert.assertTrue(row.length == 5);
+        Object[] aa1 = (Object[]) row[2];
+        Assert.assertTrue(aa1.length == arrayLength);
+        Object[] aa2 = (Object[]) aa1[1];
+        Assert.assertTrue(aa2.length == 4 || aa2.length == 2 || aa2.length == 
1);
+      }
+      Assert.assertTrue(sum == numOfRows);
+    } catch (Exception e) {
+      e.printStackTrace();
+      throw e;
+    }
+  }
 
   @Test
   public void testWriteComplexRecord() throws IOException, 
InvalidLoadOptionException {
@@ -433,6 +498,60 @@ public class AvroCarbonWriterTest {
   }
 
   @Test
+  public void testWriteArrayArrayFloat() throws IOException {
+    FileUtils.deleteDirectory(new File(path));
+
+    String mySchema =
+        "{" +
+            "  \"name\": \"address\", " +
+            "   \"type\": \"record\", " +
+            "    \"fields\": [  " +
+            "  { \"name\": \"fileName\", \"type\": \"string\"}, " +
+            "  { \"name\": \"id\", \"type\": \"int\"}, " +
+            "  {\"name\" :\"aa1\", " +
+            "   \"type\" : { " +
+            "   \"type\" :\"array\", " +
+            "   \"items\":{ " +
+            "   \"name\" :\"aa2\", " +
+            "   \"type\" :\"array\", " +
+            "   \"items\":{ " +
+            "   \"name\" :\"f1\", " +
+            "   \"type\" : \"float\", " +
+            "   \"default\":-1}}}}," +
+            "  {\"name\" :\"bb1\", " +
+            "   \"type\" : { " +
+            "   \"type\" :\"array\", " +
+            "   \"items\":{ " +
+            "   \"name\" :\"bb2\", " +
+            "   \"type\" :\"array\", " +
+            "   \"items\":{ " +
+            "   \"name\" :\"f2\", " +
+            "   \"type\" : \"float\", " +
+            "   \"default\":-1}}}}," +
+            "  {\"name\" :\"cc1\", " +
+            "   \"type\" : { " +
+            "   \"type\" :\"array\", " +
+            "   \"items\":{ " +
+            "   \"name\" :\"cc2\", " +
+            "   \"type\" :\"array\", " +
+            "   \"items\":{ " +
+            "   \"name\" :\"f3\", " +
+            "   \"type\" : \"float\", " +
+            "   \"default\":-1}}}}" +
+            "] " +
+            "}";
+
+    try {
+      WriteAvroComplexDataAndRead(mySchema);
+      Assert.assertTrue(true);
+    } catch (Exception e) {
+      e.printStackTrace();
+      Assert.fail();
+    }
+    FileUtils.deleteDirectory(new File(path));
+  }
+
+  @Test
   public void testExceptionForDuplicateColumns() throws IOException, 
InvalidLoadOptionException {
     Field[] field = new Field[2];
     field[0] = new Field("name", DataTypes.STRING);
diff --git 
a/sdk/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java 
b/sdk/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
index b108b47..153be5e 100644
--- a/sdk/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
+++ b/sdk/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java
@@ -17,15 +17,8 @@
 
 package org.apache.carbondata.sdk.file;
 
-import java.io.*;
-import java.sql.Date;
-import java.sql.Timestamp;
-import java.util.*;
-
+import junit.framework.TestCase;
 import org.apache.avro.generic.GenericData;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.log4j.Logger;
-
 import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
@@ -35,15 +28,42 @@ import 
org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.datatype.Field;
 import org.apache.carbondata.core.scan.expression.ColumnExpression;
+import org.apache.carbondata.core.scan.expression.Expression;
 import org.apache.carbondata.core.scan.expression.LiteralExpression;
-import org.apache.carbondata.core.scan.expression.conditional.*;
+import 
org.apache.carbondata.core.scan.expression.conditional.EqualToExpression;
+import 
org.apache.carbondata.core.scan.expression.conditional.GreaterThanExpression;
+import org.apache.carbondata.core.scan.expression.conditional.InExpression;
+import 
org.apache.carbondata.core.scan.expression.conditional.LessThanExpression;
+import 
org.apache.carbondata.core.scan.expression.conditional.NotEqualsExpression;
+import org.apache.carbondata.core.scan.expression.conditional.NotInExpression;
 import org.apache.carbondata.core.scan.expression.logical.AndExpression;
 import org.apache.carbondata.core.scan.expression.logical.OrExpression;
 import org.apache.carbondata.core.util.CarbonProperties;
-
-import junit.framework.TestCase;
 import org.apache.commons.io.FileUtils;
-import org.junit.*;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.log4j.Logger;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.carbondata.core.scan.filter.FilterUtil.prepareEqualToExpression;
+import static 
org.apache.carbondata.core.scan.filter.FilterUtil.prepareEqualToExpressionSet;
+import static 
org.apache.carbondata.core.scan.filter.FilterUtil.prepareOrExpression;
 
 public class CarbonReaderTest extends TestCase {
 
@@ -422,6 +442,155 @@ public class CarbonReaderTest extends TestCase {
   }
 
   @Test
+  public void testReadWithFilterEqualSet() throws IOException, 
InterruptedException {
+    String path = "./testWriteFiles";
+    FileUtils.deleteDirectory(new File(path));
+    Field[] fields = new Field[3];
+    fields[0] = new Field("name", DataTypes.STRING);
+    fields[1] = new Field("age", DataTypes.INT);
+    fields[2] = new Field("doubleField", DataTypes.DOUBLE);
+
+    TestUtil.writeFilesAndVerify(200, new Schema(fields), path);
+
+    List<Object> values = new ArrayList<>();
+    values.add("robot7");
+    values.add("robot1");
+
+    CarbonReader reader = CarbonReader
+        .builder(path, "_temp")
+        .projection(new String[]{"name", "age", "doubleField"})
+        .filter(prepareEqualToExpressionSet("name", "String", values))
+        .build();
+
+    int i = 0;
+    while (reader.hasNext()) {
+      Object[] row = (Object[]) reader.readNextRow();
+      if (((String) row[0]).contains("robot7")) {
+        assert (7 == ((int) (row[1]) % 10));
+        assert (0.5 == ((double) (row[2]) % 1));
+      } else if (((String) row[0]).contains("robot1")) {
+        assert (1 == ((int) (row[1]) % 10));
+        assert (0.5 == ((double) (row[2]) % 1));
+      } else {
+        Assert.assertTrue(false);
+      }
+      i++;
+    }
+    Assert.assertEquals(i, 40);
+
+    reader.close();
+
+    List<Object> values2 = new ArrayList<>();
+    values2.add(1);
+    values2.add(7);
+
+    CarbonReader reader2 = CarbonReader
+        .builder(path, "_temp")
+        .projection(new String[]{"name", "age", "doubleField"})
+        .filter(prepareEqualToExpressionSet("age", "int", values2))
+        .build();
+
+    i = 0;
+    while (reader2.hasNext()) {
+      Object[] row = (Object[]) reader2.readNextRow();
+      if (((String) row[0]).contains("robot7")) {
+        assert (7 == ((int) (row[1]) % 10));
+        assert (0.5 == ((double) (row[2]) % 1));
+      } else if (((String) row[0]).contains("robot1")) {
+        assert (1 == ((int) (row[1]) % 10));
+        assert (0.5 == ((double) (row[2]) % 1));
+      } else {
+        Assert.assertTrue(false);
+      }
+      i++;
+    }
+    Assert.assertEquals(i, 2);
+    reader2.close();
+
+
+    List<Object> values3 = new ArrayList<>();
+    values3.add(0.5);
+    values3.add(3.5);
+    CarbonReader reader3 = CarbonReader
+        .builder(path, "_temp")
+        .projection(new String[]{"name", "age", "doubleField"})
+        .filter(prepareEqualToExpressionSet("doubleField", "double", values3))
+        .build();
+
+    i = 0;
+    while (reader3.hasNext()) {
+      Object[] row = (Object[]) reader3.readNextRow();
+      if (((String) row[0]).contains("robot7")) {
+        assert (7 == ((int) (row[1]) % 10));
+        assert (0.5 == ((double) (row[2]) % 1));
+      } else if (((String) row[0]).contains("robot1")) {
+        assert (1 == ((int) (row[1]) % 10));
+        assert (0.5 == ((double) (row[2]) % 1));
+      } else {
+        Assert.assertTrue(false);
+      }
+      i++;
+    }
+    Assert.assertEquals(i, 2);
+    reader3.close();
+
+    CarbonReader reader4 = CarbonReader
+        .builder(path, "_temp")
+        .projection(new String[]{"name", "age", "doubleField"})
+        .filter(prepareEqualToExpression("name", "string", "robot7"))
+        .build();
+
+    i = 0;
+    while (reader4.hasNext()) {
+      Object[] row = (Object[]) reader4.readNextRow();
+      if (((String) row[0]).contains("robot7")) {
+        assert (7 == ((int) (row[1]) % 10));
+        assert (0.5 == ((double) (row[2]) % 1));
+      } else {
+        Assert.assertTrue(false);
+      }
+      i++;
+    }
+    Assert.assertEquals(i, 20);
+    reader4.close();
+
+    List<Expression> expressions = new ArrayList<>();
+    expressions.add(prepareEqualToExpression("name", "String", "robot1"));
+    expressions.add(prepareEqualToExpression("name", "String", "robot7"));
+    expressions.add(prepareEqualToExpression("age", "int", "2"));
+
+    CarbonReader reader5 = CarbonReader
+        .builder(path, "_temp")
+        .projection(new String[]{"name", "age", "doubleField"})
+        .filter(prepareOrExpression(expressions))
+        .build();
+
+    i = 0;
+    while (reader5.hasNext()) {
+      Object[] row = (Object[]) reader5.readNextRow();
+      if (((String) row[0]).contains("robot7")) {
+        assert (7 == ((int) (row[1]) % 10));
+        assert (0.5 == ((double) (row[2]) % 1));
+      } else if (((String) row[0]).contains("robot1")) {
+        assert (1 == ((int) (row[1]) % 10));
+        assert (0.5 == ((double) (row[2]) % 1));
+      } else if (((String) row[0]).contains("robot2")) {
+        assert (2 == ((int) (row[1]) % 10));
+        assert (0 == ((double) (row[2]) % 1));
+      } else {
+        Assert.assertTrue(false);
+      }
+      i++;
+    }
+    Assert.assertEquals(i, 41);
+
+    reader5.close();
+
+
+    FileUtils.deleteDirectory(new File(path));
+  }
+
+  @Test
   public void testReadWithFilterOfNonTransactionalLessThan() throws 
IOException, InterruptedException {
     String path = "./testWriteFiles";
     FileUtils.deleteDirectory(new File(path));
diff --git 
a/sdk/sdk/src/test/java/org/apache/carbondata/sdk/file/ImageTest.java 
b/sdk/sdk/src/test/java/org/apache/carbondata/sdk/file/ImageTest.java
index 6ad381e..88578af 100644
--- a/sdk/sdk/src/test/java/org/apache/carbondata/sdk/file/ImageTest.java
+++ b/sdk/sdk/src/test/java/org/apache/carbondata/sdk/file/ImageTest.java
@@ -18,7 +18,6 @@
 package org.apache.carbondata.sdk.file;
 
 import junit.framework.TestCase;
-
 import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
@@ -30,7 +29,6 @@ import 
org.apache.carbondata.core.scan.expression.conditional.EqualToExpression;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.util.BinaryUtil;
-
 import org.apache.commons.codec.DecoderException;
 import org.apache.commons.codec.binary.Hex;
 import org.apache.commons.io.FileUtils;
@@ -47,11 +45,18 @@ import javax.imageio.stream.FileImageInputStream;
 import javax.imageio.stream.ImageInputStream;
 import java.awt.color.ColorSpace;
 import java.awt.image.BufferedImage;
-import java.io.*;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.FilenameFilter;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 
+import static 
org.apache.carbondata.core.scan.filter.FilterUtil.prepareEqualToExpression;
 import static org.apache.carbondata.sdk.file.utils.SDKUtil.listFiles;
 
 public class ImageTest extends TestCase {
@@ -322,6 +327,7 @@ public class ImageTest extends TestCase {
     byte[] originBinary = null;
 
     // read and write image data
+    String binaryValue = null;
     for (int j = 0; j < num; j++) {
       CarbonWriter writer = CarbonWriter
           .builder()
@@ -340,7 +346,8 @@ public class ImageTest extends TestCase {
           hexValue = Hex.encodeHex(originBinary);
         }
         // write data
-        writer.write(new String[]{"robot" + (i % 10), String.valueOf(i), 
String.valueOf(hexValue)});
+        binaryValue = String.valueOf(hexValue);
+        writer.write(new String[]{"robot" + (i % 10), String.valueOf(i), 
binaryValue});
         bis.close();
       }
       writer.close();
@@ -407,6 +414,56 @@ public class ImageTest extends TestCase {
       i++;
     }
     reader2.close();
+
+
+    // Read data with filter for binary
+
+    CarbonReader reader3 = CarbonReader
+        .builder(path, "_temp")
+        .filter(prepareEqualToExpression("image", "binary", binaryValue))
+        .build();
+
+    System.out.println("\nData:");
+    i = 0;
+    while (i < 20 && reader3.hasNext()) {
+      Object[] row = (Object[]) reader3.readNextRow();
+
+      byte[] outputBinary = Hex.decodeHex(new String((byte[]) 
row[1]).toCharArray());
+      System.out.println(row[0] + " " + row[2] + " image size:" + 
outputBinary.length);
+
+      // validate output binary data and origin binary data
+      assert (originBinary.length == outputBinary.length);
+      for (int j = 0; j < originBinary.length; j++) {
+        assert (originBinary[j] == outputBinary[j]);
+      }
+      String value = new String(outputBinary);
+      Assert.assertTrue(value.startsWith("�PNG"));
+      // save image, user can compare the save image and original image
+      String destString = "./target/binary/image" + i + ".jpg";
+      BufferedOutputStream bos = new BufferedOutputStream(new 
FileOutputStream(destString));
+      bos.write(outputBinary);
+      bos.close();
+      i++;
+    }
+    assert (1 == i);
+    System.out.println("\nFinished");
+    reader3.close();
+
+
+    CarbonReader reader4 = CarbonReader
+        .builder(path, "_temp")
+        .filter(prepareEqualToExpression("image", "binary", "hello"))
+        .build();
+
+    System.out.println("\nData:");
+    i = 0;
+    while (i < 20 && reader4.hasNext()) {
+      Object[] row = (Object[]) reader4.readNextRow();
+      assert (null == row[1]);
+    }
+    System.out.println("\nFinished");
+    reader4.close();
+
     try {
       FileUtils.deleteDirectory(new File(path));
     } catch (IOException e) {

Reply via email to