vdiravka commented on a change in pull request #1466: DRILL-6381: Add support for index based planning and execution URL: https://github.com/apache/drill/pull/1466#discussion_r223652043
########## File path: contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java ########## @@ -98,91 +114,181 @@ private final boolean disableCountOptimization; private final boolean nonExistentColumnsProjection; - public MaprDBJsonRecordReader(MapRDBSubScanSpec subScanSpec, - MapRDBFormatPluginConfig formatPluginConfig, - List<SchemaPath> projectedColumns, FragmentContext context) { + protected final MapRDBSubScanSpec subScanSpec; + protected final MapRDBFormatPlugin formatPlugin; + + protected OjaiValueWriter valueWriter; + protected DocumentReaderVectorWriter documentWriter; + protected int maxRecordsToRead = -1; + + public MaprDBJsonRecordReader(MapRDBSubScanSpec subScanSpec, MapRDBFormatPlugin formatPlugin, + List<SchemaPath> projectedColumns, FragmentContext context, int maxRecords) { + this(subScanSpec, formatPlugin, projectedColumns, context); + this.maxRecordsToRead = maxRecords; + } + + protected MaprDBJsonRecordReader(MapRDBSubScanSpec subScanSpec, MapRDBFormatPlugin formatPlugin, + List<SchemaPath> projectedColumns, FragmentContext context) { buffer = context.getManagedBuffer(); - projectedFields = null; - tableName = Preconditions.checkNotNull(subScanSpec, "MapRDB reader needs a sub-scan spec").getTableName(); - documentReaderIterators = null; - includeId = false; - idOnly = false; + final Path tablePath = new Path(Preconditions.checkNotNull(subScanSpec, + "MapRDB reader needs a sub-scan spec").getTableName()); + this.subScanSpec = subScanSpec; + this.formatPlugin = formatPlugin; + final IndexDesc indexDesc = subScanSpec.getIndexDesc(); byte[] serializedFilter = subScanSpec.getSerializedFilter(); condition = null; if (serializedFilter != null) { condition = com.mapr.db.impl.ConditionImpl.parseFrom(ByteBufs.wrap(serializedFilter)); } - disableCountOptimization = formatPluginConfig.disableCountOptimization(); + disableCountOptimization = formatPlugin.getConfig().disableCountOptimization(); + // Below call will set the scannedFields and includeId correctly setColumns(projectedColumns); - unionEnabled = context.getOptions().getBoolean(ExecConstants.ENABLE_UNION_TYPE_KEY); - readNumbersAsDouble = formatPluginConfig.isReadAllNumbersAsDouble(); - allTextMode = formatPluginConfig.isAllTextMode(); - ignoreSchemaChange = formatPluginConfig.isIgnoreSchemaChange(); - disablePushdown = !formatPluginConfig.isEnablePushdown(); - nonExistentColumnsProjection = formatPluginConfig.isNonExistentFieldSupport(); + unionEnabled = context.getOptions().getOption(ExecConstants.ENABLE_UNION_TYPE); + readNumbersAsDouble = formatPlugin.getConfig().isReadAllNumbersAsDouble(); + allTextMode = formatPlugin.getConfig().isAllTextMode(); + ignoreSchemaChange = formatPlugin.getConfig().isIgnoreSchemaChange(); + disablePushdown = !formatPlugin.getConfig().isEnablePushdown(); + nonExistentColumnsProjection = formatPlugin.getConfig().isNonExistentFieldSupport(); + + // Do not use cached table handle for two reasons. + // cached table handles default timeout is 60 min after which those handles will become stale. + // Since execution can run for longer than 60 min, we want to get a new table handle and use it + // instead of the one from cache. + // Since we are setting some table options, we do not want to use shared handles. + // + // Call it here instead of setup since this will make sure it's called under correct UGI block when impersonation + // is enabled and table is used with and without views. + table = (indexDesc == null ? MapRDBImpl.getTable(tablePath) : MapRDBImpl.getIndexTable(indexDesc)); + + if (condition != null) { + logger.debug("Created record reader with query condition {}", condition.toString()); + } else { + logger.debug("Created record reader with query condition NULL"); + } } @Override protected Collection<SchemaPath> transformColumns(Collection<SchemaPath> columns) { Set<SchemaPath> transformed = Sets.newLinkedHashSet(); + Set<SchemaPath> encodedSchemaPathSet = Sets.newLinkedHashSet(); + if (disablePushdown) { transformed.add(SchemaPath.STAR_COLUMN); includeId = true; - return transformed; - } + } else { + if (isStarQuery()) { + transformed.add(SchemaPath.STAR_COLUMN); + includeId = true; + if (isSkipQuery() && !disableCountOptimization) { + // `SELECT COUNT(*)` query + idOnly = true; + scannedFields = ID_ONLY_PROJECTION; + } + } else { + Set<FieldPath> scannedFieldsSet = Sets.newTreeSet(); + Set<FieldPath> projectedFieldsSet = null; - if (isStarQuery()) { - transformed.add(SchemaPath.STAR_COLUMN); - includeId = true; - if (isSkipQuery()) { - // `SELECT COUNT(*)` query - if (!disableCountOptimization) { - projectedFields = new FieldPath[1]; - projectedFields[0] = ID_FIELD; + for (SchemaPath column : columns) { + if (EncodedSchemaPathSet.isEncodedSchemaPath(column)) { + encodedSchemaPathSet.add(column); + } else { + transformed.add(column); + if (!DOCUMENT_SCHEMA_PATH.equals(column)) { + FieldPath fp = getFieldPathForProjection(column); + scannedFieldsSet.add(fp); + } else { + projectWholeDocument = true; + } + } + } + if (projectWholeDocument) { + // we do not want to project the fields from the encoded field path list + // hence make a copy of the scannedFieldsSet here for projection. + projectedFieldsSet = new ImmutableSet.Builder<FieldPath>() + .addAll(scannedFieldsSet).build(); } - } - return transformed; - } - Set<FieldPath> projectedFieldsSet = Sets.newTreeSet(); - for (SchemaPath column : columns) { - if (column.getRootSegment().getPath().equalsIgnoreCase(ID_KEY)) { - includeId = true; - if (!disableCountOptimization) { - projectedFieldsSet.add(ID_FIELD); + if (encodedSchemaPathSet.size() > 0) { + Collection<SchemaPath> decodedSchemaPaths = EncodedSchemaPathSet.decode(encodedSchemaPathSet); + // now we look at the fields which are part of encoded field set and either + // add them to scanned set or clear the scanned set if all fields were requested. + for (SchemaPath column : decodedSchemaPaths) { + if (column.equals(SchemaPath.STAR_COLUMN)) { + includeId = true; + scannedFieldsSet.clear(); + break; + } + scannedFieldsSet.add(getFieldPathForProjection(column)); + } } - } else { - projectedFieldsSet.add(getFieldPathForProjection(column)); - } - transformed.add(column); - } + if (scannedFieldsSet.size() > 0) { + if (includesIdField(scannedFieldsSet)) { + includeId = true; + } + scannedFields = scannedFieldsSet.toArray(new FieldPath[scannedFieldsSet.size()]); + } - if (projectedFieldsSet.size() > 0) { - projectedFields = projectedFieldsSet.toArray(new FieldPath[projectedFieldsSet.size()]); - } + if (disableCountOptimization) { + idOnly = (scannedFields == null); + } - if (disableCountOptimization) { - idOnly = (projectedFields == null); - } + if(projectWholeDocument) { Review comment: space ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services