This is an automated email from the ASF dual-hosted git repository.

jackylk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 3ebf7f6  [CARBONDATA-3611] Fix failed when filter with measure columns 
on stream table when this stream table includes complex columns
3ebf7f6 is described below

commit 3ebf7f61f3326bdc9f16a1665d44bea82828d3f7
Author: Zhang Zhichao <441586...@qq.com>
AuthorDate: Sat Dec 7 11:25:04 2019 +0800

    [CARBONDATA-3611] Fix failed when filter with measure columns on stream 
table when this stream table includes complex columns
    
    Problem:
    Filter failed with measure columns on stream table when this stream table 
includes complex columns
    
    Solution:
    Use 'segmentProperties.getDimensions().size()' instead of 
'segmentProperties.getLastDimensionColOrdinal()' when set  
'columnResolvedFilterInfo.setColumnIndexInMinMaxByteArray' on stream data file.
    
    This closes #3503
---
 .../indexstore/blockletindex/BlockDataMap.java     |   8 +-
 .../scan/executor/impl/AbstractQueryExecutor.java  |   2 +-
 .../carbondata/core/scan/filter/FilterUtil.java    |  57 ++--
 .../carbondata/core/stream/StreamPruner.java       |   2 +-
 .../datamap/examples/MinMaxIndexDataMap.java       |   2 +-
 .../hadoop/stream/StreamRecordReader.java          |   4 +-
 .../carbondata/TestStreamingTableQueryFilter.scala | 315 +++++++++++++++++++++
 7 files changed, 360 insertions(+), 30 deletions(-)

diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
index e83985f..c865603 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
@@ -610,8 +610,8 @@ public class BlockDataMap extends CoarseGrainDataMap
 
   @Override
   public boolean isScanRequired(FilterResolverIntf filterExp) {
-    FilterExecuter filterExecuter = FilterUtil
-        .getFilterExecuterTree(filterExp, getSegmentProperties(), null, 
getMinMaxCacheColumns());
+    FilterExecuter filterExecuter = FilterUtil.getFilterExecuterTree(
+        filterExp, getSegmentProperties(), null, getMinMaxCacheColumns(), 
false);
     DataMapRow unsafeRow = taskSummaryDMStore
         .getDataMapRow(getTaskSummarySchema(), 
taskSummaryDMStore.getRowCount() - 1);
     boolean isScanRequired = FilterExpressionProcessor
@@ -741,8 +741,8 @@ public class BlockDataMap extends CoarseGrainDataMap
       // Remove B-tree jump logic as start and end key prepared is not
       // correct for old store scenarios
       int entryIndex = 0;
-      FilterExecuter filterExecuter = FilterUtil
-          .getFilterExecuterTree(filterExp, getSegmentProperties(), null, 
getMinMaxCacheColumns());
+      FilterExecuter filterExecuter = FilterUtil.getFilterExecuterTree(
+          filterExp, getSegmentProperties(), null, getMinMaxCacheColumns(), 
false);
       // flag to be used for deciding whether use min/max in executor pruning 
for BlockletDataMap
       boolean useMinMaxForPruning = useMinMaxForExecutorPruning(filterExp);
       // min and max for executor pruning
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
index 9688868..c891ba2 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
@@ -522,7 +522,7 @@ public abstract class AbstractQueryExecutor<E> implements 
QueryExecutor<E> {
       filterResolverIntf = queryModel.getDataMapFilter().getResolver();
       blockExecutionInfo.setFilterExecuterTree(
           FilterUtil.getFilterExecuterTree(filterResolverIntf, 
segmentProperties,
-              blockExecutionInfo.getComlexDimensionInfoMap()));
+              blockExecutionInfo.getComlexDimensionInfoMap(), false));
     }
     try {
       startIndexKey = 
FilterUtil.prepareDefaultStartIndexKey(segmentProperties);
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
index a096051..679ee43 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
@@ -134,12 +134,16 @@ public final class FilterUtil {
    *
    * @param filterExpressionResolverTree
    * @param segmentProperties
+   * @param complexDimensionInfoMap
+   * @param minMaxCacheColumns
+   * @param isStreamDataFile: whether create filter executer tree for stream 
data files
    * @return FilterExecuter instance
+   *
    */
   private static FilterExecuter createFilterExecuterTree(
       FilterResolverIntf filterExpressionResolverTree, SegmentProperties 
segmentProperties,
       Map<Integer, GenericQueryType> complexDimensionInfoMap,
-      List<CarbonColumn> minMaxCacheColumns) {
+      List<CarbonColumn> minMaxCacheColumns, boolean isStreamDataFile) {
     FilterExecuterType filterExecuterType = 
filterExpressionResolverTree.getFilterExecuterType();
     if (null != filterExecuterType) {
       switch (filterExecuterType) {
@@ -154,7 +158,7 @@ public final class FilterUtil {
           }
           // return true filter expression if filter column min/max is not 
cached in driver
           if 
(checkIfCurrentNodeToBeReplacedWithTrueFilterExpression(filterExpressionResolverTree,
-              segmentProperties, minMaxCacheColumns)) {
+              segmentProperties, minMaxCacheColumns, isStreamDataFile)) {
             return new TrueFilterExecutor();
           }
           return getIncludeFilterExecuter(
@@ -167,15 +171,15 @@ public final class FilterUtil {
         case OR:
           return new OrFilterExecuterImpl(
               createFilterExecuterTree(filterExpressionResolverTree.getLeft(), 
segmentProperties,
-                  complexDimensionInfoMap, minMaxCacheColumns),
+                  complexDimensionInfoMap, minMaxCacheColumns, 
isStreamDataFile),
               
createFilterExecuterTree(filterExpressionResolverTree.getRight(), 
segmentProperties,
-                  complexDimensionInfoMap, minMaxCacheColumns));
+                  complexDimensionInfoMap, minMaxCacheColumns, 
isStreamDataFile));
         case AND:
           return new AndFilterExecuterImpl(
               createFilterExecuterTree(filterExpressionResolverTree.getLeft(), 
segmentProperties,
-                  complexDimensionInfoMap, minMaxCacheColumns),
+                  complexDimensionInfoMap, minMaxCacheColumns, 
isStreamDataFile),
               
createFilterExecuterTree(filterExpressionResolverTree.getRight(), 
segmentProperties,
-                  complexDimensionInfoMap, minMaxCacheColumns));
+                  complexDimensionInfoMap, minMaxCacheColumns, 
isStreamDataFile));
         case ROWLEVEL_LESSTHAN:
         case ROWLEVEL_LESSTHAN_EQUALTO:
         case ROWLEVEL_GREATERTHAN_EQUALTO:
@@ -186,7 +190,7 @@ public final class FilterUtil {
           if (checkIfCurrentNodeToBeReplacedWithTrueFilterExpression(
               rowLevelRangeFilterResolver.getDimColEvaluatorInfoList(),
               rowLevelRangeFilterResolver.getMsrColEvalutorInfoList(), 
segmentProperties,
-              minMaxCacheColumns)) {
+              minMaxCacheColumns, isStreamDataFile)) {
             return new TrueFilterExecutor();
           }
           return RowLevelRangeTypeExecuterFactory
@@ -195,7 +199,7 @@ public final class FilterUtil {
         case RANGE:
           // return true filter expression if filter column min/max is not 
cached in driver
           if 
(checkIfCurrentNodeToBeReplacedWithTrueFilterExpression(filterExpressionResolverTree,
-              segmentProperties, minMaxCacheColumns)) {
+              segmentProperties, minMaxCacheColumns, isStreamDataFile)) {
             return new TrueFilterExecutor();
           }
           return new RangeValueFilterExecuterImpl(
@@ -291,20 +295,21 @@ public final class FilterUtil {
   private static boolean 
checkIfCurrentNodeToBeReplacedWithTrueFilterExpression(
       List<DimColumnResolvedFilterInfo> dimColEvaluatorInfoList,
       List<MeasureColumnResolvedFilterInfo> msrColEvaluatorInfoList,
-      SegmentProperties segmentProperties, List<CarbonColumn> 
minMaxCacheColumns) {
+      SegmentProperties segmentProperties, List<CarbonColumn> 
minMaxCacheColumns,
+      boolean isStreamDataFile) {
     boolean replaceCurrentNodeWithTrueFilter = false;
     ColumnResolvedFilterInfo columnResolvedFilterInfo = null;
     if (!msrColEvaluatorInfoList.isEmpty()) {
       columnResolvedFilterInfo = msrColEvaluatorInfoList.get(0);
       replaceCurrentNodeWithTrueFilter =
           checkIfFilterColumnIsCachedInDriver(columnResolvedFilterInfo, 
segmentProperties,
-              minMaxCacheColumns, true);
+              minMaxCacheColumns, true, isStreamDataFile);
     } else {
       columnResolvedFilterInfo = dimColEvaluatorInfoList.get(0);
       if 
(!columnResolvedFilterInfo.getDimension().hasEncoding(Encoding.IMPLICIT)) {
         replaceCurrentNodeWithTrueFilter =
             checkIfFilterColumnIsCachedInDriver(columnResolvedFilterInfo, 
segmentProperties,
-                minMaxCacheColumns, false);
+                minMaxCacheColumns, false, isStreamDataFile);
       }
     }
     return replaceCurrentNodeWithTrueFilter;
@@ -317,24 +322,25 @@ public final class FilterUtil {
    * @param filterExpressionResolverTree
    * @param segmentProperties
    * @param minMaxCacheColumns
+   * @Param isStreamDataFile
    * @return
    */
   private static boolean 
checkIfCurrentNodeToBeReplacedWithTrueFilterExpression(
       FilterResolverIntf filterExpressionResolverTree, SegmentProperties 
segmentProperties,
-      List<CarbonColumn> minMaxCacheColumns) {
+      List<CarbonColumn> minMaxCacheColumns, boolean isStreamDataFile) {
     boolean replaceCurrentNodeWithTrueFilter = false;
     ColumnResolvedFilterInfo columnResolvedFilterInfo = null;
     if (null != filterExpressionResolverTree.getMsrColResolvedFilterInfo()) {
       columnResolvedFilterInfo = 
filterExpressionResolverTree.getMsrColResolvedFilterInfo();
       replaceCurrentNodeWithTrueFilter =
           checkIfFilterColumnIsCachedInDriver(columnResolvedFilterInfo, 
segmentProperties,
-              minMaxCacheColumns, true);
+              minMaxCacheColumns, true, isStreamDataFile);
     } else {
       columnResolvedFilterInfo = 
filterExpressionResolverTree.getDimColResolvedFilterInfo();
       if 
(!columnResolvedFilterInfo.getDimension().hasEncoding(Encoding.IMPLICIT)) {
         replaceCurrentNodeWithTrueFilter =
             checkIfFilterColumnIsCachedInDriver(columnResolvedFilterInfo, 
segmentProperties,
-                minMaxCacheColumns, false);
+                minMaxCacheColumns, false, isStreamDataFile);
       }
     }
     return replaceCurrentNodeWithTrueFilter;
@@ -352,7 +358,7 @@ public final class FilterUtil {
    */
   private static boolean checkIfFilterColumnIsCachedInDriver(
       ColumnResolvedFilterInfo columnResolvedFilterInfo, SegmentProperties 
segmentProperties,
-      List<CarbonColumn> minMaxCacheColumns, boolean isMeasure) {
+      List<CarbonColumn> minMaxCacheColumns, boolean isMeasure, boolean 
isStreamDataFile) {
     boolean replaceCurrentNodeWithTrueFilter = false;
     CarbonColumn columnFromCurrentBlock = null;
     if (isMeasure) {
@@ -377,8 +383,17 @@ public final class FilterUtil {
         // if columns to be cached are not specified then in that case all 
columns will be cached
         // and  then the ordinal of column will be its index in the min/max 
byte array
         if (isMeasure) {
-          columnResolvedFilterInfo.setColumnIndexInMinMaxByteArray(
-              segmentProperties.getLastDimensionColOrdinal() + 
columnFromCurrentBlock.getOrdinal());
+          // when read from stream data file, minmax columns cache don't 
include complex columns,
+          // so it can not use 
'segmentProperties.getLastDimensionColOrdinal()' as
+          // last dimension ordinal.
+          if (isStreamDataFile) {
+            columnResolvedFilterInfo.setColumnIndexInMinMaxByteArray(
+                segmentProperties.getDimensions().size() + 
columnFromCurrentBlock.getOrdinal());
+          } else {
+            columnResolvedFilterInfo.setColumnIndexInMinMaxByteArray(
+                segmentProperties.getLastDimensionColOrdinal() + 
columnFromCurrentBlock
+                .getOrdinal());
+          }
         } else {
           columnResolvedFilterInfo
               
.setColumnIndexInMinMaxByteArray(columnFromCurrentBlock.getOrdinal());
@@ -1492,9 +1507,9 @@ public final class FilterUtil {
    */
   public static FilterExecuter getFilterExecuterTree(
       FilterResolverIntf filterExpressionResolverTree, SegmentProperties 
segmentProperties,
-      Map<Integer, GenericQueryType> complexDimensionInfoMap) {
+      Map<Integer, GenericQueryType> complexDimensionInfoMap, boolean 
isStreamDataFile) {
     return getFilterExecuterTree(filterExpressionResolverTree, 
segmentProperties,
-        complexDimensionInfoMap, null);
+        complexDimensionInfoMap, null, isStreamDataFile);
   }
 
   /**
@@ -1507,9 +1522,9 @@ public final class FilterUtil {
   public static FilterExecuter getFilterExecuterTree(
       FilterResolverIntf filterExpressionResolverTree, SegmentProperties 
segmentProperties,
       Map<Integer, GenericQueryType> complexDimensionInfoMap,
-      List<CarbonColumn> minMaxCacheColumns) {
+      List<CarbonColumn> minMaxCacheColumns, boolean isStreamDataFile) {
     return createFilterExecuterTree(filterExpressionResolverTree, 
segmentProperties,
-        complexDimensionInfoMap, minMaxCacheColumns);
+        complexDimensionInfoMap, minMaxCacheColumns, isStreamDataFile);
   }
 
   /**
diff --git 
a/core/src/main/java/org/apache/carbondata/core/stream/StreamPruner.java 
b/core/src/main/java/org/apache/carbondata/core/stream/StreamPruner.java
index c92a8a1..e8790ee 100644
--- a/core/src/main/java/org/apache/carbondata/core/stream/StreamPruner.java
+++ b/core/src/main/java/org/apache/carbondata/core/stream/StreamPruner.java
@@ -72,7 +72,7 @@ public class StreamPruner {
       SegmentProperties segmentProperties =
           new SegmentProperties(listOfColumns, columnCardinality);
       filterExecuter = FilterUtil.getFilterExecuterTree(
-          filterExp, segmentProperties, null, minMaxCacheColumns);
+          filterExp, segmentProperties, null, minMaxCacheColumns, false);
     }
   }
 
diff --git 
a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMap.java
 
b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMap.java
index d32afd9..d860229 100644
--- 
a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMap.java
+++ 
b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMap.java
@@ -135,7 +135,7 @@ public class MinMaxIndexDataMap extends CoarseGrainDataMap {
       }
     } else {
       FilterExecuter filterExecuter =
-          FilterUtil.getFilterExecuterTree(filterExp, segmentProperties, null);
+          FilterUtil.getFilterExecuterTree(filterExp, segmentProperties, null, 
false);
       for (int blkIdx = 0; blkIdx < readMinMaxDataMap.length; blkIdx++) {
         for (int blkltIdx = 0; blkltIdx < readMinMaxDataMap[blkIdx].length; 
blkltIdx++) {
 
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/StreamRecordReader.java
 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/StreamRecordReader.java
index 020af65..3ea65e5 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/StreamRecordReader.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/StreamRecordReader.java
@@ -236,8 +236,8 @@ public class StreamRecordReader extends RecordReader<Void, 
Object> {
     Map<Integer, GenericQueryType> complexDimensionInfoMap = new HashMap<>();
 
     FilterResolverIntf resolverIntf = model.getDataMapFilter().getResolver();
-    filter =
-        FilterUtil.getFilterExecuterTree(resolverIntf, segmentProperties, 
complexDimensionInfoMap);
+    filter = FilterUtil.getFilterExecuterTree(
+        resolverIntf, segmentProperties, complexDimensionInfoMap, true);
     // for row filter, we need update column index
     
FilterUtil.updateIndexOfColumnExpression(resolverIntf.getFilterExpression(),
         carbonTable.getDimensionOrdinalMax());
diff --git 
a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableQueryFilter.scala
 
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableQueryFilter.scala
new file mode 100644
index 0000000..f47e18d
--- /dev/null
+++ 
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableQueryFilter.scala
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.carbondata
+
+import java.io.PrintWriter
+import java.math.BigDecimal
+import java.net.{BindException, ServerSocket}
+import java.sql.{Date, Timestamp}
+
+import scala.collection.mutable
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery}
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+class TestStreamingTableQueryFilter extends QueryTest with BeforeAndAfterAll {
+
+  private val spark = sqlContext.sparkSession
+  private val dataFilePath = 
s"$resourcesPath/streamSample_with_long_string.csv"
+  private val longStrValue = "abc" * 12000
+
+  override def beforeAll {
+    CarbonProperties.getInstance().addProperty(
+      CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+      CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
+    CarbonProperties.getInstance().addProperty(
+      CarbonCommonConstants.CARBON_DATE_FORMAT,
+      CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
+    sql("DROP DATABASE IF EXISTS streaming_table_filter CASCADE")
+    sql("CREATE DATABASE streaming_table_filter")
+    sql("USE streaming_table_filter")
+
+    dropTable()
+    createTableWithComplexType(
+      tableName = "stream_filter", streaming = true, withBatchLoad = true)
+  }
+
+  override def afterAll {
+    dropTable()
+    sql("USE default")
+    sql("DROP DATABASE IF EXISTS streaming_table_filter CASCADE")
+  }
+
+  def dropTable(): Unit = {
+    sql("drop table if exists streaming_table_filter.stream_filter")
+  }
+
+  test("[CARBONDATA-3611] Fix failed when filter with measure columns on 
stream table when this stream table includes complex columns") {
+    executeStreamingIngest(
+      tableName = "stream_filter",
+      batchNums = 2,
+      rowNumsEachBatch = 25,
+      intervalOfSource = 5,
+      intervalOfIngest = 5,
+      continueSeconds = 20,
+      handoffSize = 51200,
+      autoHandoff = false
+    )
+
+    // non-filter
+    val result = sql("select * from streaming_table_filter.stream_filter order 
by id, name").collect()
+    assert(result != null)
+    assert(result.length == 55)
+    // check one row of streaming data
+    assert(result(3).getString(1) == "name_4")
+    assert(result(3).getString(9) == ("4" + longStrValue))
+    // check one row of batch loading
+    assert(result(52).getInt(0) == 100000003)
+    assert(result(52).getString(1) == "batch_3")
+    assert(result(52).getString(9) == ("3" + longStrValue))
+    assert(result(52).getStruct(10).getInt(1) == 40)
+
+    // filter
+    checkAnswer(
+      sql("select * from streaming_table_filter.stream_filter where id = 1"),
+      Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, 
Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), 
Timestamp.valueOf("2010-01-01 10:01:01.0"), ("1" + longStrValue), 
Row(wrap(Array("school_1", "school_11")), 1))))
+
+    checkAnswer(
+      sql("select * from streaming_table_filter.stream_filter where id > 49 
and id < 100000002"),
+      Seq(Row(50, "name_50", "city_50", 500000.0, BigDecimal.valueOf(0.01), 
80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), 
Timestamp.valueOf("2010-01-01 10:01:01.0"), ("50" + longStrValue), 
Row(wrap(Array("school_50", "school_5050")), 50)),
+        Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 
80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), 
Timestamp.valueOf("2010-01-01 10:01:01.0"), ("1" + longStrValue), 
Row(wrap(Array("school_1", "school_11")), 20))))
+
+    checkAnswer(
+      sql("select * from streaming_table_filter.stream_filter where id between 
50 and 100000001"),
+      Seq(Row(50, "name_50", "city_50", 500000.0, BigDecimal.valueOf(0.01), 
80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), 
Timestamp.valueOf("2010-01-01 10:01:01.0"), ("50" + longStrValue), 
Row(wrap(Array("school_50", "school_5050")), 50)),
+        Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 
80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), 
Timestamp.valueOf("2010-01-01 10:01:01.0"), ("1" + longStrValue), 
Row(wrap(Array("school_1", "school_11")), 20))))
+
+    checkAnswer(
+      sql("select * from streaming_table_filter.stream_filter where salary = 
490000.0 and percent = 80.01"),
+      Seq(Row(49, "name_49", "city_49", 490000.0, BigDecimal.valueOf(0.01), 
80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), 
Timestamp.valueOf("2010-01-01 10:01:01.0"), ("49" + longStrValue), 
Row(wrap(Array("school_49", "school_4949")), 49))))
+
+    checkAnswer(
+      sql("select * from streaming_table_filter.stream_filter where id > 20 
and salary = 300000.0 and file.age > 25"),
+      Seq(Row(30, "name_30", "city_30", 300000.0, BigDecimal.valueOf(0.01), 
80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), 
Timestamp.valueOf("2010-01-01 10:01:01.0"), ("30" + longStrValue), 
Row(wrap(Array("school_30", "school_3030")), 30))))
+  }
+
+  def createWriteSocketThread(
+      serverSocket: ServerSocket,
+      writeNums: Int,
+      rowNums: Int,
+      intervalSecond: Int): Thread = {
+    new Thread() {
+      override def run(): Unit = {
+        // wait for client to connection request and accept
+        val clientSocket = serverSocket.accept()
+        val socketWriter = new PrintWriter(clientSocket.getOutputStream())
+        var index = 0
+        for (_ <- 1 to writeNums) {
+          // write 5 records per iteration
+          val stringBuilder = new StringBuilder()
+          for (_ <- 1 to rowNums) {
+            index = index + 1
+            stringBuilder.append(index.toString + ",name_" + index
+                                 + ",city_" + index + "," + (10000.00 * 
index).toString + ",0.01,80.01" +
+                                 ",1990-01-01,2010-01-01 10:01:01,2010-01-01 
10:01:01," +
+                                 index.toString() + ("abc" * 12000) +
+                                 ",school_" + index + ":school_" + index + 
index + "$" + index)
+            stringBuilder.append("\n")
+          }
+          socketWriter.append(stringBuilder.toString())
+          socketWriter.flush()
+          Thread.sleep(1000 * intervalSecond)
+        }
+        socketWriter.close()
+      }
+    }
+  }
+
+  def createSocketStreamingThread(
+      spark: SparkSession,
+      port: Int,
+      carbonTable: CarbonTable,
+      tableIdentifier: TableIdentifier,
+      intervalSecond: Int = 2,
+      handoffSize: Long = CarbonCommonConstants.HANDOFF_SIZE_DEFAULT,
+      autoHandoff: Boolean = 
CarbonCommonConstants.ENABLE_AUTO_HANDOFF_DEFAULT.toBoolean
+  ): Thread = {
+    new Thread() {
+      override def run(): Unit = {
+        var qry: StreamingQuery = null
+        try {
+          import spark.implicits._
+          val readSocketDF = spark.readStream
+            .format("socket")
+            .option("host", "localhost")
+            .option("port", port)
+            .load().as[String]
+            .map(_.split(","))
+            .map { fields => {
+              val tmp = fields(10).split("\\$")
+              val file = FileElement(tmp(0).split(":"), tmp(1).toInt)
+              StreamLongStrData(fields(0).toInt, fields(1), fields(2), 
fields(3).toFloat,
+                  BigDecimal.valueOf(fields(4).toDouble), fields(5).toDouble,
+                  fields(6), fields(7), fields(8), fields(9), file)
+            } }
+
+          // Write data from socket stream to carbondata file
+          // repartition to simulate an empty partition when readSocketDF has 
only one row
+          qry = readSocketDF.repartition(2).writeStream
+            .format("carbondata")
+            .trigger(ProcessingTime(s"$intervalSecond seconds"))
+            .option("checkpointLocation", 
CarbonTablePath.getStreamingCheckpointDir(carbonTable.getTablePath))
+            .option("dbName", tableIdentifier.database.get)
+            .option("tableName", tableIdentifier.table)
+            .option(CarbonCommonConstants.HANDOFF_SIZE, handoffSize)
+            .option("timestampformat", 
CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
+            .option(CarbonCommonConstants.ENABLE_AUTO_HANDOFF, autoHandoff)
+            .start()
+          qry.awaitTermination()
+        } catch {
+          case ex: Throwable =>
+            LOGGER.error(ex.getMessage)
+            throw new Exception(ex.getMessage, ex)
+        } finally {
+          if (null != qry) {
+            qry.stop()
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * start ingestion thread: write `rowNumsEachBatch` rows repeatly for 
`batchNums` times.
+   */
+  def executeStreamingIngest(
+      tableName: String,
+      batchNums: Int,
+      rowNumsEachBatch: Int,
+      intervalOfSource: Int,
+      intervalOfIngest: Int,
+      continueSeconds: Int,
+      handoffSize: Long = CarbonCommonConstants.HANDOFF_SIZE_DEFAULT,
+      autoHandoff: Boolean = 
CarbonCommonConstants.ENABLE_AUTO_HANDOFF_DEFAULT.toBoolean
+  ): Unit = {
+    val identifier = new TableIdentifier(tableName, 
Option("streaming_table_filter"))
+    val carbonTable = 
CarbonEnv.getInstance(spark).carbonMetaStore.lookupRelation(identifier)(spark)
+      .asInstanceOf[CarbonRelation].metaData.carbonTable
+    var server: ServerSocket = null
+    try {
+      server = getServerSocket()
+      val thread1 = createWriteSocketThread(
+        serverSocket = server,
+        writeNums = batchNums,
+        rowNums = rowNumsEachBatch,
+        intervalSecond = intervalOfSource)
+      val thread2 = createSocketStreamingThread(
+        spark = spark,
+        port = server.getLocalPort,
+        carbonTable = carbonTable,
+        tableIdentifier = identifier,
+        intervalSecond = intervalOfIngest,
+        handoffSize = handoffSize,
+        autoHandoff = autoHandoff)
+      thread1.start()
+      thread2.start()
+      Thread.sleep(continueSeconds * 1000)
+      thread2.interrupt()
+      thread1.interrupt()
+    } finally {
+      if (null != server) {
+        server.close()
+      }
+    }
+  }
+
+  def createTableWithComplexType(
+      tableName: String,
+      streaming: Boolean,
+      withBatchLoad: Boolean): Unit = {
+    sql(
+      s"""
+         | CREATE TABLE streaming_table_filter.$tableName(
+         | id INT,
+         | name STRING,
+         | city STRING,
+         | salary FLOAT,
+         | tax DECIMAL(8,2),
+         | percent double,
+         | birthday DATE,
+         | register TIMESTAMP,
+         | updated TIMESTAMP,
+         | longstr STRING,
+         | file struct<school:array<string>, age:int>
+         | )
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES(${if (streaming) "'streaming'='true', " else "" }
+         | 'sort_columns'='name', 
'dictionary_include'='name,tax,percent,updated', 
'LONG_STRING_COLUMNS'='longstr')
+         | """.stripMargin)
+
+    if (withBatchLoad) {
+      // batch loading 5 rows
+      executeBatchLoad(tableName)
+    }
+  }
+
+  def executeBatchLoad(tableName: String): Unit = {
+    sql(
+      s"LOAD DATA LOCAL INPATH '$dataFilePath' INTO TABLE 
streaming_table_filter.$tableName OPTIONS" +
+      "('HEADER'='true','COMPLEX_DELIMITER_LEVEL_1'='$', 
'COMPLEX_DELIMITER_LEVEL_2'=':')")
+  }
+
+  def wrap(array: Array[String]) = {
+    new mutable.WrappedArray.ofRef(array)
+  }
+
+  /**
+   * get a ServerSocket
+   * if the address was already used, it will retry to use new port number.
+   *
+   * @return ServerSocket
+   */
+  def getServerSocket(): ServerSocket = {
+    var port = 7071
+    var serverSocket: ServerSocket = null
+    var retry = false
+    do {
+      try {
+        retry = false
+        serverSocket = new ServerSocket(port)
+      } catch {
+        case ex: BindException =>
+          retry = true
+          port = port + 2
+          if (port >= 65535) {
+            throw ex
+          }
+      }
+    } while (retry)
+    serverSocket
+  }
+}

Reply via email to