[ 
https://issues.apache.org/jira/browse/DRILL-6381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16647424#comment-16647424
 ] 

ASF GitHub Bot commented on DRILL-6381:
---------------------------------------

vdiravka commented on a change in pull request #1466: DRILL-6381: Add support 
for index based planning and execution
URL: https://github.com/apache/drill/pull/1466#discussion_r223652043
 
 

 ##########
 File path: 
contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java
 ##########
 @@ -98,91 +114,181 @@
   private final boolean disableCountOptimization;
   private final boolean nonExistentColumnsProjection;
 
-  public MaprDBJsonRecordReader(MapRDBSubScanSpec subScanSpec,
-      MapRDBFormatPluginConfig formatPluginConfig,
-      List<SchemaPath> projectedColumns, FragmentContext context) {
+  protected final MapRDBSubScanSpec subScanSpec;
+  protected final MapRDBFormatPlugin formatPlugin;
+
+  protected OjaiValueWriter valueWriter;
+  protected DocumentReaderVectorWriter documentWriter;
+  protected int maxRecordsToRead = -1;
+
+  public MaprDBJsonRecordReader(MapRDBSubScanSpec subScanSpec, 
MapRDBFormatPlugin formatPlugin,
+                                List<SchemaPath> projectedColumns, 
FragmentContext context, int maxRecords) {
+    this(subScanSpec, formatPlugin, projectedColumns, context);
+    this.maxRecordsToRead = maxRecords;
+  }
+
+  protected MaprDBJsonRecordReader(MapRDBSubScanSpec subScanSpec, 
MapRDBFormatPlugin formatPlugin,
+                                List<SchemaPath> projectedColumns, 
FragmentContext context) {
     buffer = context.getManagedBuffer();
-    projectedFields = null;
-    tableName = Preconditions.checkNotNull(subScanSpec, "MapRDB reader needs a 
sub-scan spec").getTableName();
-    documentReaderIterators = null;
-    includeId = false;
-    idOnly    = false;
+    final Path tablePath = new Path(Preconditions.checkNotNull(subScanSpec,
+      "MapRDB reader needs a sub-scan spec").getTableName());
+    this.subScanSpec = subScanSpec;
+    this.formatPlugin = formatPlugin;
+    final IndexDesc indexDesc = subScanSpec.getIndexDesc();
     byte[] serializedFilter = subScanSpec.getSerializedFilter();
     condition = null;
 
     if (serializedFilter != null) {
       condition = 
com.mapr.db.impl.ConditionImpl.parseFrom(ByteBufs.wrap(serializedFilter));
     }
 
-    disableCountOptimization = formatPluginConfig.disableCountOptimization();
+    disableCountOptimization = 
formatPlugin.getConfig().disableCountOptimization();
+    // Below call will set the scannedFields and includeId correctly
     setColumns(projectedColumns);
-    unionEnabled = 
context.getOptions().getBoolean(ExecConstants.ENABLE_UNION_TYPE_KEY);
-    readNumbersAsDouble = formatPluginConfig.isReadAllNumbersAsDouble();
-    allTextMode = formatPluginConfig.isAllTextMode();
-    ignoreSchemaChange = formatPluginConfig.isIgnoreSchemaChange();
-    disablePushdown = !formatPluginConfig.isEnablePushdown();
-    nonExistentColumnsProjection = 
formatPluginConfig.isNonExistentFieldSupport();
+    unionEnabled = 
context.getOptions().getOption(ExecConstants.ENABLE_UNION_TYPE);
+    readNumbersAsDouble = formatPlugin.getConfig().isReadAllNumbersAsDouble();
+    allTextMode = formatPlugin.getConfig().isAllTextMode();
+    ignoreSchemaChange = formatPlugin.getConfig().isIgnoreSchemaChange();
+    disablePushdown = !formatPlugin.getConfig().isEnablePushdown();
+    nonExistentColumnsProjection = 
formatPlugin.getConfig().isNonExistentFieldSupport();
+
+    // Do not use cached table handle for two reasons.
+    // cached table handles default timeout is 60 min after which those 
handles will become stale.
+    // Since execution can run for longer than 60 min, we want to get a new 
table handle and use it
+    // instead of the one from cache.
+    // Since we are setting some table options, we do not want to use shared 
handles.
+    //
+    // Call it here instead of setup since this will make sure it's called 
under correct UGI block when impersonation
+    // is enabled and table is used with and without views.
+    table = (indexDesc == null ? MapRDBImpl.getTable(tablePath) : 
MapRDBImpl.getIndexTable(indexDesc));
+
+    if (condition != null) {
+      logger.debug("Created record reader with query condition {}", 
condition.toString());
+    } else {
+      logger.debug("Created record reader with query condition NULL");
+    }
   }
 
   @Override
   protected Collection<SchemaPath> transformColumns(Collection<SchemaPath> 
columns) {
     Set<SchemaPath> transformed = Sets.newLinkedHashSet();
+    Set<SchemaPath> encodedSchemaPathSet = Sets.newLinkedHashSet();
+
     if (disablePushdown) {
       transformed.add(SchemaPath.STAR_COLUMN);
       includeId = true;
-      return transformed;
-    }
+    } else {
+      if (isStarQuery()) {
+        transformed.add(SchemaPath.STAR_COLUMN);
+        includeId = true;
+        if (isSkipQuery() && !disableCountOptimization) {
+          // `SELECT COUNT(*)` query
+          idOnly = true;
+          scannedFields = ID_ONLY_PROJECTION;
+        }
+      } else {
+        Set<FieldPath> scannedFieldsSet = Sets.newTreeSet();
+        Set<FieldPath> projectedFieldsSet = null;
 
-    if (isStarQuery()) {
-      transformed.add(SchemaPath.STAR_COLUMN);
-      includeId = true;
-      if (isSkipQuery()) {
-       // `SELECT COUNT(*)` query
-       if (!disableCountOptimization) {
-          projectedFields = new FieldPath[1];
-          projectedFields[0] = ID_FIELD;
+        for (SchemaPath column : columns) {
+          if (EncodedSchemaPathSet.isEncodedSchemaPath(column)) {
+            encodedSchemaPathSet.add(column);
+          } else {
+            transformed.add(column);
+            if (!DOCUMENT_SCHEMA_PATH.equals(column)) {
+              FieldPath fp = getFieldPathForProjection(column);
+              scannedFieldsSet.add(fp);
+            } else {
+              projectWholeDocument = true;
+            }
+          }
+        }
+        if (projectWholeDocument) {
+          // we do not want to project the fields from the encoded field path 
list
+          // hence make a copy of the scannedFieldsSet here for projection.
+          projectedFieldsSet = new ImmutableSet.Builder<FieldPath>()
+              .addAll(scannedFieldsSet).build();
         }
-      }
-      return transformed;
-    }
 
-    Set<FieldPath> projectedFieldsSet = Sets.newTreeSet();
-    for (SchemaPath column : columns) {
-      if (column.getRootSegment().getPath().equalsIgnoreCase(ID_KEY)) {
-        includeId = true;
-        if (!disableCountOptimization) {
-          projectedFieldsSet.add(ID_FIELD);
+        if (encodedSchemaPathSet.size() > 0) {
+          Collection<SchemaPath> decodedSchemaPaths = 
EncodedSchemaPathSet.decode(encodedSchemaPathSet);
+          // now we look at the fields which are part of encoded field set and 
either
+          // add them to scanned set or clear the scanned set if all fields 
were requested.
+          for (SchemaPath column : decodedSchemaPaths) {
+            if (column.equals(SchemaPath.STAR_COLUMN)) {
+              includeId = true;
+              scannedFieldsSet.clear();
+              break;
+            }
+            scannedFieldsSet.add(getFieldPathForProjection(column));
+          }
         }
-      } else {
-        projectedFieldsSet.add(getFieldPathForProjection(column));
-      }
 
-      transformed.add(column);
-    }
+        if (scannedFieldsSet.size() > 0) {
+          if (includesIdField(scannedFieldsSet)) {
+            includeId = true;
+          }
+          scannedFields = scannedFieldsSet.toArray(new 
FieldPath[scannedFieldsSet.size()]);
+        }
 
-    if (projectedFieldsSet.size() > 0) {
-      projectedFields = projectedFieldsSet.toArray(new 
FieldPath[projectedFieldsSet.size()]);
-    }
+        if (disableCountOptimization) {
+          idOnly = (scannedFields == null);
+        }
 
-    if (disableCountOptimization) {
-      idOnly = (projectedFields == null);
-    }
+        if(projectWholeDocument) {
 
 Review comment:
   space

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add capability to do index based planning and execution
> -------------------------------------------------------
>
>                 Key: DRILL-6381
>                 URL: https://issues.apache.org/jira/browse/DRILL-6381
>             Project: Apache Drill
>          Issue Type: New Feature
>          Components: Execution - Relational Operators, Query Planning &amp; 
> Optimization
>            Reporter: Aman Sinha
>            Assignee: Aman Sinha
>            Priority: Major
>             Fix For: 1.15.0
>
>
> If the underlying data source supports indexes (primary and secondary 
> indexes), Drill should leverage those during planning and execution in order 
> to improve query performance.  
> On the planning side, Drill planner should be enhanced to provide an 
> abstraction layer which express the index metadata and statistics.  Further, 
> a cost-based index selection is needed to decide which index(es) are 
> suitable.  
> On the execution side, appropriate operator enhancements would be needed to 
> handle different categories of indexes such as covering, non-covering 
> indexes, taking into consideration the index data may not be co-located with 
> the primary table, i.e a global index.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to