Repository: drill Updated Branches: refs/heads/master bd1d9c24f -> 2862beaf5
DRILL-3474: Add implicit file columns support Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/3209886a Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/3209886a Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/3209886a Branch: refs/heads/master Commit: 3209886a8548eea4a2f74c059542672f8665b8d2 Parents: bd1d9c2 Author: Arina Ielchiieva <[email protected]> Authored: Mon Apr 18 19:36:52 2016 +0300 Committer: Parth Chandra <[email protected]> Committed: Sat Jun 18 17:02:58 2016 -0700 ---------------------------------------------------------------------- .../hive/HiveDrillNativeScanBatchCreator.java | 34 ++- .../org/apache/drill/exec/ExecConstants.java | 12 ++ .../drill/exec/physical/impl/ScanBatch.java | 88 ++++---- .../impl/project/ProjectRecordBatch.java | 15 +- .../server/options/SystemOptionManager.java | 6 +- .../exec/store/ImplicitColumnExplorer.java | 206 +++++++++++++++++++ .../exec/store/dfs/easy/EasyFormatPlugin.java | 81 +++----- .../store/parquet/ParquetScanBatchCreator.java | 81 +++----- .../exec/store/TestImplicitFileColumns.java | 113 ++++++++++ 9 files changed, 461 insertions(+), 175 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/3209886a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java index ab321ba..a9575ba 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java @@ -20,10 +20,10 @@ package org.apache.drill.exec.store.hive; import java.io.IOException; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.regex.Matcher; import java.util.regex.Pattern; +import com.google.common.base.Functions; import org.apache.drill.common.AutoCloseables; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.expression.SchemaPath; @@ -67,17 +67,15 @@ public class HiveDrillNativeScanBatchCreator implements BatchCreator<HiveDrillNa final List<SchemaPath> columns = config.getColumns(); final String partitionDesignator = context.getOptions() .getOption(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL).string_val; + List<Map<String, String>> implicitColumns = Lists.newLinkedList(); + boolean selectAllQuery = AbstractRecordReader.isStarQuery(columns); final boolean hasPartitions = (partitions != null && partitions.size() > 0); final List<String[]> partitionColumns = Lists.newArrayList(); final List<Integer> selectedPartitionColumns = Lists.newArrayList(); List<SchemaPath> newColumns = columns; - if (AbstractRecordReader.isStarQuery(columns)) { - for (int i = 0; i < table.getPartitionKeys().size(); i++) { - selectedPartitionColumns.add(i); - } - } else { + if (!selectAllQuery) { // Separate out the partition and non-partition columns. Non-partition columns are passed directly to the // ParquetRecordReader. Partition columns are passed to ScanBatch. newColumns = Lists.newArrayList(); @@ -86,7 +84,7 @@ public class HiveDrillNativeScanBatchCreator implements BatchCreator<HiveDrillNa Matcher m = pattern.matcher(column.getAsUnescapedPath()); if (m.matches()) { selectedPartitionColumns.add( - Integer.parseInt(column.getAsUnescapedPath().toString().substring(partitionDesignator.length()))); + Integer.parseInt(column.getAsUnescapedPath().substring(partitionDesignator.length()))); } else { newColumns.add(column); } @@ -103,6 +101,7 @@ public class HiveDrillNativeScanBatchCreator implements BatchCreator<HiveDrillNa // TODO: In future we can get this cache from Metadata cached on filesystem. final Map<String, ParquetMetadata> footerCache = Maps.newHashMap(); + Map<String, String> mapWithMaxColumns = Maps.newLinkedHashMap(); try { for (InputSplit split : splits) { final FileSplit fileSplit = (FileSplit) split; @@ -128,10 +127,19 @@ public class HiveDrillNativeScanBatchCreator implements BatchCreator<HiveDrillNa parquetMetadata, newColumns) ); + Map<String, String> implicitValues = Maps.newLinkedHashMap(); if (hasPartitions) { - Partition p = partitions.get(currentPartitionIndex); - partitionColumns.add(p.getValues().toArray(new String[0])); + List<String> values = partitions.get(currentPartitionIndex).getValues(); + for (int i = 0; i < values.size(); i++) { + if (selectAllQuery || selectedPartitionColumns.contains(i)) { + implicitValues.put(partitionDesignator + i, values.get(i)); + } + } + } + implicitColumns.add(implicitValues); + if (implicitValues.size() > mapWithMaxColumns.size()) { + mapWithMaxColumns = implicitValues; } } currentPartitionIndex++; @@ -141,6 +149,12 @@ public class HiveDrillNativeScanBatchCreator implements BatchCreator<HiveDrillNa throw new ExecutionSetupException("Failed to create RecordReaders. " + e.getMessage(), e); } + // all readers should have the same number of implicit columns, add missing ones with value null + mapWithMaxColumns = Maps.transformValues(mapWithMaxColumns, Functions.constant((String) null)); + for (Map<String, String> map : implicitColumns) { + map.putAll(Maps.difference(map, mapWithMaxColumns).entriesOnlyOnRight()); + } + // If there are no readers created (which is possible when the table is empty or no row groups are matched), // create an empty RecordReader to output the schema if (readers.size() == 0) { @@ -148,7 +162,7 @@ public class HiveDrillNativeScanBatchCreator implements BatchCreator<HiveDrillNa ImpersonationUtil.createProxyUgi(config.getUserName(), context.getQueryUserName()))); } - return new ScanBatch(config, context, oContext, readers.iterator(), partitionColumns, selectedPartitionColumns); + return new ScanBatch(config, context, oContext, readers.iterator(), implicitColumns); } /** http://git-wip-us.apache.org/repos/asf/drill/blob/3209886a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java index 6eb8a3a..0bc8a07 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java @@ -149,6 +149,18 @@ public interface ExecConstants { String FILESYSTEM_PARTITION_COLUMN_LABEL = "drill.exec.storage.file.partition.column.label"; OptionValidator FILESYSTEM_PARTITION_COLUMN_LABEL_VALIDATOR = new StringValidator(FILESYSTEM_PARTITION_COLUMN_LABEL, "dir"); + /** + * Implicit file columns + */ + String IMPLICIT_FILENAME_COLUMN_LABEL = "drill.exec.storage.implicit.filename.column.label"; + OptionValidator IMPLICIT_FILENAME_COLUMN_LABEL_VALIDATOR = new StringValidator(IMPLICIT_FILENAME_COLUMN_LABEL, "filename"); + String IMPLICIT_SUFFIX_COLUMN_LABEL = "drill.exec.storage.implicit.suffix.column.label"; + OptionValidator IMPLICIT_SUFFIX_COLUMN_LABEL_VALIDATOR = new StringValidator(IMPLICIT_SUFFIX_COLUMN_LABEL, "suffix"); + String IMPLICIT_FQN_COLUMN_LABEL = "drill.exec.storage.implicit.fqn.column.label"; + OptionValidator IMPLICIT_FQN_COLUMN_LABEL_VALIDATOR = new StringValidator(IMPLICIT_FQN_COLUMN_LABEL, "fqn"); + String IMPLICIT_FILEPATH_COLUMN_LABEL = "drill.exec.storage.implicit.filepath.column.label"; + OptionValidator IMPLICIT_FILEPATH_COLUMN_LABEL_VALIDATOR = new StringValidator(IMPLICIT_FILEPATH_COLUMN_LABEL, "filepath"); + String JSON_READ_NUMBERS_AS_DOUBLE = "store.json.read_numbers_as_double"; BooleanValidator JSON_READ_NUMBERS_AS_DOUBLE_VALIDATOR = new BooleanValidator(JSON_READ_NUMBERS_AS_DOUBLE, false); http://git-wip-us.apache.org/repos/asf/drill/blob/3209886a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java index c1cd469..43fabba 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java @@ -29,7 +29,6 @@ import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.types.TypeProtos.MinorType; import org.apache.drill.common.types.Types; -import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.exception.OutOfMemoryException; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.expr.TypeHelper; @@ -46,7 +45,6 @@ import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.record.WritableBatch; import org.apache.drill.exec.record.selection.SelectionVector2; import org.apache.drill.exec.record.selection.SelectionVector4; -import org.apache.drill.exec.server.options.OptionValue; import org.apache.drill.exec.store.RecordReader; import org.apache.drill.exec.testing.ControlsInjector; import org.apache.drill.exec.testing.ControlsInjectorFactory; @@ -56,7 +54,6 @@ import org.apache.drill.exec.vector.NullableVarCharVector; import org.apache.drill.exec.vector.SchemaChangeCallBack; import org.apache.drill.exec.vector.ValueVector; -import com.google.common.collect.Lists; import com.google.common.collect.Maps; /** @@ -80,20 +77,16 @@ public class ScanBatch implements CloseableRecordBatch { private RecordReader currentReader; private BatchSchema schema; private final Mutator mutator = new Mutator(); - private Iterator<String[]> partitionColumns; - private String[] partitionValues; - private List<ValueVector> partitionVectors; - private List<Integer> selectedPartitionColumns; - private String partitionColumnDesignator; private boolean done = false; private SchemaChangeCallBack callBack = new SchemaChangeCallBack(); private boolean hasReadNonEmptyFile = false; - + private Map<String, ValueVector> implicitVectors; + private Iterator<Map<String, String>> implicitColumns; + private Map<String, String> implicitValues; public ScanBatch(PhysicalOperator subScanConfig, FragmentContext context, OperatorContext oContext, Iterator<RecordReader> readers, - List<String[]> partitionColumns, - List<Integer> selectedPartitionColumns) throws ExecutionSetupException { + List<Map<String, String>> implicitColumns) throws ExecutionSetupException { this.context = context; this.readers = readers; if (!readers.hasNext()) { @@ -118,16 +111,10 @@ public class ScanBatch implements CloseableRecordBatch { } oContext.getStats().stopProcessing(); } - this.partitionColumns = partitionColumns.iterator(); - partitionValues = this.partitionColumns.hasNext() ? this.partitionColumns.next() : null; - this.selectedPartitionColumns = selectedPartitionColumns; - - // TODO Remove null check after DRILL-2097 is resolved. That JIRA refers to test cases that do not initialize - // options; so labelValue = null. - final OptionValue labelValue = context.getOptions().getOption(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL); - partitionColumnDesignator = labelValue == null ? "dir" : labelValue.string_val; + this.implicitColumns = implicitColumns.iterator(); + this.implicitValues = this.implicitColumns.hasNext() ? this.implicitColumns.next() : null; - addPartitionVectors(); + addImplicitVectors(); } public ScanBatch(PhysicalOperator subScanConfig, FragmentContext context, @@ -135,7 +122,7 @@ public class ScanBatch implements CloseableRecordBatch { throws ExecutionSetupException { this(subScanConfig, context, context.newOperatorContext(subScanConfig), - readers, Collections.<String[]> emptyList(), Collections.<Integer> emptyList()); + readers, Collections.<Map<String, String>> emptyList()); } @Override @@ -221,7 +208,7 @@ public class ScanBatch implements CloseableRecordBatch { currentReader.close(); currentReader = readers.next(); - partitionValues = partitionColumns.hasNext() ? partitionColumns.next() : null; + implicitValues = implicitColumns.hasNext() ? implicitColumns.next() : null; currentReader.setup(oContext, mutator); try { currentReader.allocate(fieldVectorMap); @@ -230,8 +217,7 @@ public class ScanBatch implements CloseableRecordBatch { clearFieldVectorMap(); return IterOutcome.OUT_OF_MEMORY; } - addPartitionVectors(); - + addImplicitVectors(); } catch (ExecutionSetupException e) { this.context.fail(e); releaseAssets(); @@ -241,7 +227,7 @@ public class ScanBatch implements CloseableRecordBatch { // At this point, the current reader has read 1 or more rows. hasReadNonEmptyFile = true; - populatePartitionVectors(); + populateImplicitVectors(); for (VectorWrapper w : container) { w.getValueVector().getMutator().setValueCount(recordCount); @@ -271,41 +257,43 @@ public class ScanBatch implements CloseableRecordBatch { } } - private void addPartitionVectors() throws ExecutionSetupException { + private void addImplicitVectors() throws ExecutionSetupException { try { - if (partitionVectors != null) { - for (ValueVector v : partitionVectors) { + if (implicitVectors != null) { + for (ValueVector v : implicitVectors.values()) { v.clear(); } } - partitionVectors = Lists.newArrayList(); - for (int i : selectedPartitionColumns) { - final MaterializedField field = - MaterializedField.create(SchemaPath.getSimplePath(partitionColumnDesignator + i).getAsUnescapedPath(), - Types.optional(MinorType.VARCHAR)); - final ValueVector v = mutator.addField(field, NullableVarCharVector.class); - partitionVectors.add(v); + implicitVectors = Maps.newHashMap(); + + if (implicitValues != null) { + for (String column : implicitValues.keySet()) { + final MaterializedField field = MaterializedField.create(column, Types.optional(MinorType.VARCHAR)); + final ValueVector v = mutator.addField(field, NullableVarCharVector.class); + implicitVectors.put(column, v); + } } } catch(SchemaChangeException e) { throw new ExecutionSetupException(e); } } - private void populatePartitionVectors() { - for (int index = 0; index < selectedPartitionColumns.size(); index++) { - final int i = selectedPartitionColumns.get(index); - final NullableVarCharVector v = (NullableVarCharVector) partitionVectors.get(index); - if (partitionValues.length > i) { - final String val = partitionValues[i]; - AllocationHelper.allocate(v, recordCount, val.length()); - final byte[] bytes = val.getBytes(); - for (int j = 0; j < recordCount; j++) { - v.getMutator().setSafe(j, bytes, 0, bytes.length); + private void populateImplicitVectors() { + if (implicitValues != null) { + for (Map.Entry<String, String> entry : implicitValues.entrySet()) { + final NullableVarCharVector v = (NullableVarCharVector) implicitVectors.get(entry.getKey()); + String val; + if ((val = entry.getValue()) != null) { + AllocationHelper.allocate(v, recordCount, val.length()); + final byte[] bytes = val.getBytes(); + for (int j = 0; j < recordCount; j++) { + v.getMutator().setSafe(j, bytes, 0, bytes.length); + } + v.getMutator().setValueCount(recordCount); + } else { + AllocationHelper.allocate(v, recordCount, 0); + v.getMutator().setValueCount(recordCount); } - v.getMutator().setValueCount(recordCount); - } else { - AllocationHelper.allocate(v, recordCount, 0); - v.getMutator().setValueCount(recordCount); } } } @@ -418,7 +406,7 @@ public class ScanBatch implements CloseableRecordBatch { @Override public void close() throws Exception { container.clear(); - for (final ValueVector v : partitionVectors) { + for (final ValueVector v : implicitVectors.values()) { v.clear(); } fieldVectorMap.clear(); http://git-wip-us.apache.org/repos/asf/drill/blob/3209886a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java index 4ad5b8b..7892f75 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java @@ -60,6 +60,7 @@ import org.apache.drill.exec.record.TransferPair; import org.apache.drill.exec.record.TypedFieldId; import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.VectorWrapper; +import org.apache.drill.exec.store.ImplicitColumnExplorer; import org.apache.drill.exec.vector.AllocationHelper; import org.apache.drill.exec.vector.FixedWidthVector; import org.apache.drill.exec.vector.ValueVector; @@ -73,7 +74,6 @@ import com.google.common.collect.Maps; public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProjectRecordBatch.class); - private Projector projector; private List<ValueVector> allocationVectors; private List<ComplexWriter> complexWriters; @@ -351,6 +351,11 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> { if (name == EMPTY_STRING) { continue; } + + if (isImplicitFileColumn(vvIn)) { + continue; + } + final FieldReference ref = new FieldReference(name); final ValueVector vvOut = container.addOrGet(MaterializedField.create(ref.getAsNamePart().getName(), vvIn.getField().getType()), callBack); final TransferPair tp = vvIn.makeTransferPair(vvOut); @@ -369,6 +374,10 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> { continue; } + if (isImplicitFileColumn(vvIn)) { + continue; + } + final LogicalExpression expr = ExpressionTreeMaterializer.materialize(originalPath, incoming, collector, context.getFunctionRegistry() ); if (collector.hasErrors()) { throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema. Errors:\n %s.", collector.toErrorString())); @@ -485,6 +494,10 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> { } } + private boolean isImplicitFileColumn(ValueVector vvIn) { + return ImplicitColumnExplorer.initImplicitFileColumns(context.getOptions()).get(vvIn.getField().getName()) != null; + } + private List<NamedExpression> getExpressionList() { if (popConfig.getExprs() != null) { return popConfig.getExprs(); http://git-wip-us.apache.org/repos/asf/drill/blob/3209886a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java index 579276e..119de98 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java @@ -137,7 +137,11 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea ClassTransformer.SCALAR_REPLACEMENT_VALIDATOR, ExecConstants.ENABLE_NEW_TEXT_READER, ExecConstants.ENABLE_BULK_LOAD_TABLE_LIST, - ExecConstants.WEB_LOGS_MAX_LINES_VALIDATOR + ExecConstants.WEB_LOGS_MAX_LINES_VALIDATOR, + ExecConstants.IMPLICIT_FILENAME_COLUMN_LABEL_VALIDATOR, + ExecConstants.IMPLICIT_SUFFIX_COLUMN_LABEL_VALIDATOR, + ExecConstants.IMPLICIT_FQN_COLUMN_LABEL_VALIDATOR, + ExecConstants.IMPLICIT_FILEPATH_COLUMN_LABEL_VALIDATOR }; final Map<String, OptionValidator> tmp = new HashMap<>(); for (final OptionValidator validator : validators) { http://git-wip-us.apache.org/repos/asf/drill/blob/3209886a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ImplicitColumnExplorer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ImplicitColumnExplorer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ImplicitColumnExplorer.java new file mode 100644 index 0000000..94a0dca --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ImplicitColumnExplorer.java @@ -0,0 +1,206 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.io.Files; +import org.apache.commons.lang3.ArrayUtils; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.map.CaseInsensitiveMap; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.server.options.OptionManager; +import org.apache.drill.exec.server.options.OptionValue; +import org.apache.drill.exec.store.dfs.easy.FileWork; +import org.apache.hadoop.fs.Path; + +import java.util.List; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class ImplicitColumnExplorer { + + private final String partitionDesignator; + private final List<SchemaPath> columns; + private final boolean selectAllColumns; + private final List<Integer> selectedPartitionColumns; + private final List<SchemaPath> tableColumns; + private final Map<String, ImplicitFileColumns> allImplicitColumns; + private final Map<String, ImplicitFileColumns> selectedImplicitColumns; + + + /** + * Helper class that encapsulates logic for sorting out columns + * between actual table columns, partition columns and implicit file columns. + * Also populates map with implicit columns names as keys and their values + */ + public ImplicitColumnExplorer(FragmentContext context, List<SchemaPath> columns) { + this.partitionDesignator = context.getOptions().getOption(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL).string_val; + this.columns = columns; + this.selectAllColumns = columns != null && AbstractRecordReader.isStarQuery(columns); + this.selectedPartitionColumns = Lists.newArrayList(); + this.tableColumns = Lists.newArrayList(); + this.allImplicitColumns = initImplicitFileColumns(context.getOptions()); + this.selectedImplicitColumns = CaseInsensitiveMap.newHashMap(); + + init(); + } + + /** + * Creates case insensitive map with implicit file columns as keys and appropriate ImplicitFileColumns enum as values + */ + public static Map<String, ImplicitFileColumns> initImplicitFileColumns(OptionManager optionManager) { + Map<String, ImplicitFileColumns> map = CaseInsensitiveMap.newHashMap(); + for (ImplicitFileColumns e : ImplicitFileColumns.values()) { + OptionValue optionValue; + if ((optionValue = optionManager.getOption(e.name)) != null) { + map.put(optionValue.string_val, e); + } + } + return map; + } + + /** + * Compares selection root and actual file path to determine partition columns values. + * Adds implicit file columns according to columns list. + * + * @return map with columns names as keys and their values + */ + public Map<String, String> populateImplicitColumns(FileWork work, String selectionRoot) { + Map<String, String> implicitValues = Maps.newLinkedHashMap(); + if (selectionRoot != null) { + String[] r = Path.getPathWithoutSchemeAndAuthority(new Path(selectionRoot)).toString().split("/"); + Path path = Path.getPathWithoutSchemeAndAuthority(new Path(work.getPath())); + String[] p = path.toString().split("/"); + if (p.length > r.length) { + String[] q = ArrayUtils.subarray(p, r.length, p.length - 1); + for (int a = 0; a < q.length; a++) { + if (selectAllColumns || selectedPartitionColumns.contains(a)) { + implicitValues.put(partitionDesignator + a, q[a]); + } + } + } + //add implicit file columns + for (Map.Entry<String, ImplicitFileColumns> entry : selectedImplicitColumns.entrySet()) { + implicitValues.put(entry.getKey(), entry.getValue().getValue(path)); + } + } + return implicitValues; + } + + public boolean isSelectAllColumns() { + return selectAllColumns; + } + + public List<SchemaPath> getTableColumns() { + return tableColumns; + } + + /** + * If it is not select all query, sorts out columns into three categories: + * 1. table columns + * 2. partition columns + * 3. implicit file columns + */ + private void init() { + if (selectAllColumns) { + selectedImplicitColumns.putAll(allImplicitColumns); + } else { + Pattern pattern = Pattern.compile(String.format("%s[0-9]+", partitionDesignator)); + for (SchemaPath column : columns) { + String path = column.getAsUnescapedPath(); + Matcher m = pattern.matcher(path); + if (m.matches()) { + selectedPartitionColumns.add(Integer.parseInt(path.substring(partitionDesignator.length()))); + } else if (allImplicitColumns.get(path) != null) { + selectedImplicitColumns.put(path, allImplicitColumns.get(path)); + } else { + tableColumns.add(column); + } + } + + // We must make sure to pass a table column(not to be confused with partition column) to the underlying record + // reader. + if (tableColumns.size() == 0) { + tableColumns.add(AbstractRecordReader.STAR_COLUMN); + } + } + } + + /** + * Columns that give information from where file data comes from. + * Columns are implicit, so should be called explicitly in query + */ + public enum ImplicitFileColumns { + + /** + * Fully qualified name, contains full path to file and file name + */ + FQN (ExecConstants.IMPLICIT_FQN_COLUMN_LABEL) { + @Override + public String getValue(Path path) { + return path.toString(); + } + }, + + /** + * Full path to file without file name + */ + FILEPATH (ExecConstants.IMPLICIT_FILEPATH_COLUMN_LABEL) { + @Override + public String getValue(Path path) { + return path.getParent().toString(); + } + }, + + /** + * File name with extension without path + */ + FILENAME (ExecConstants.IMPLICIT_FILENAME_COLUMN_LABEL) { + @Override + public String getValue(Path path) { + return path.getName(); + } + }, + + /** + * File suffix (without dot at the beginning) + */ + SUFFIX (ExecConstants.IMPLICIT_SUFFIX_COLUMN_LABEL) { + @Override + public String getValue(Path path) { + return Files.getFileExtension(path.getName()); + } + }; + + String name; + + ImplicitFileColumns(String name) { + this.name = name; + } + + /** + * Using file path calculates value for each implicit file column + */ + public abstract String getValue(Path path); + + } + +} http://git-wip-us.apache.org/repos/asf/drill/blob/3209886a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java index ec3cae8..5881d33 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java @@ -19,16 +19,15 @@ package org.apache.drill.exec.store.dfs.easy; import java.io.IOException; import java.util.List; +import java.util.Map; import java.util.Set; -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import org.apache.commons.lang3.ArrayUtils; +import com.google.common.base.Functions; +import com.google.common.collect.Maps; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.logical.FormatPluginConfig; import org.apache.drill.common.logical.StoragePluginConfig; -import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.physical.base.AbstractGroupScan; @@ -42,7 +41,7 @@ import org.apache.drill.exec.planner.physical.PlannerSettings; import org.apache.drill.exec.record.CloseableRecordBatch; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.server.DrillbitContext; -import org.apache.drill.exec.store.AbstractRecordReader; +import org.apache.drill.exec.store.ImplicitColumnExplorer; import org.apache.drill.exec.store.RecordReader; import org.apache.drill.exec.store.RecordWriter; import org.apache.drill.exec.store.StoragePluginOptimizerRule; @@ -53,7 +52,6 @@ import org.apache.drill.exec.store.dfs.FormatMatcher; import org.apache.drill.exec.store.dfs.FormatPlugin; import org.apache.drill.exec.store.schedule.CompleteFileWork; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; @@ -125,41 +123,14 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements List<SchemaPath> columns, String userName) throws ExecutionSetupException; CloseableRecordBatch getReaderBatch(FragmentContext context, EasySubScan scan) throws ExecutionSetupException { - String partitionDesignator = context.getOptions() - .getOption(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL).string_val; - List<SchemaPath> columns = scan.getColumns(); - List<RecordReader> readers = Lists.newArrayList(); - List<String[]> partitionColumns = Lists.newArrayList(); - List<Integer> selectedPartitionColumns = Lists.newArrayList(); - boolean selectAllColumns = false; - - if (columns == null || columns.size() == 0 || AbstractRecordReader.isStarQuery(columns)) { - selectAllColumns = true; - } else { - List<SchemaPath> newColumns = Lists.newArrayList(); - Pattern pattern = Pattern.compile(String.format("%s[0-9]+", partitionDesignator)); - for (SchemaPath column : columns) { - Matcher m = pattern.matcher(column.getAsUnescapedPath()); - if (m.matches()) { - selectedPartitionColumns.add(Integer.parseInt(column.getAsUnescapedPath().toString().substring(partitionDesignator.length()))); - } else { - newColumns.add(column); - } - } + final ImplicitColumnExplorer columnExplorer = new ImplicitColumnExplorer(context, scan.getColumns()); - // We must make sure to pass a table column(not to be confused with partition column) to the underlying record - // reader. - if (newColumns.size()==0) { - newColumns.add(AbstractRecordReader.STAR_COLUMN); - } - // Create a new sub scan object with the new set of columns; - EasySubScan newScan = new EasySubScan(scan.getUserName(), scan.getWorkUnits(), scan.getFormatPlugin(), - newColumns, scan.getSelectionRoot()); - newScan.setOperatorId(scan.getOperatorId()); - scan = newScan; + if (!columnExplorer.isSelectAllColumns()) { + scan = new EasySubScan(scan.getUserName(), scan.getWorkUnits(), scan.getFormatPlugin(), + columnExplorer.getTableColumns(), scan.getSelectionRoot()); + scan.setOperatorId(scan.getOperatorId()); } - int numParts = 0; OperatorContext oContext = context.newOperatorContext(scan); final DrillFileSystem dfs; try { @@ -168,30 +139,26 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements throw new ExecutionSetupException(String.format("Failed to create FileSystem: %s", e.getMessage()), e); } - for(FileWork work : scan.getWorkUnits()){ - readers.add(getRecordReader(context, dfs, work, scan.getColumns(), scan.getUserName())); - if (scan.getSelectionRoot() != null) { - String[] r = Path.getPathWithoutSchemeAndAuthority(new Path(scan.getSelectionRoot())).toString().split("/"); - String[] p = Path.getPathWithoutSchemeAndAuthority(new Path(work.getPath())).toString().split("/"); - if (p.length > r.length) { - String[] q = ArrayUtils.subarray(p, r.length, p.length - 1); - partitionColumns.add(q); - numParts = Math.max(numParts, q.length); - } else { - partitionColumns.add(new String[] {}); - } - } else { - partitionColumns.add(new String[] {}); + List<RecordReader> readers = Lists.newArrayList(); + List<Map<String, String>> implicitColumns = Lists.newArrayList(); + Map<String, String> mapWithMaxColumns = Maps.newLinkedHashMap(); + for(FileWork work : scan.getWorkUnits()) { + RecordReader recordReader = getRecordReader(context, dfs, work, scan.getColumns(), scan.getUserName()); + readers.add(recordReader); + Map<String, String> implicitValues = columnExplorer.populateImplicitColumns(work, scan.getSelectionRoot()); + implicitColumns.add(implicitValues); + if (implicitValues.size() > mapWithMaxColumns.size()) { + mapWithMaxColumns = implicitValues; } } - if (selectAllColumns) { - for (int i = 0; i < numParts; i++) { - selectedPartitionColumns.add(i); - } + // all readers should have the same number of implicit columns, add missing ones with value null + Map<String, String> diff = Maps.transformValues(mapWithMaxColumns, Functions.constant((String) null)); + for (Map<String, String> map : implicitColumns) { + map.putAll(Maps.difference(map, diff).entriesOnlyOnRight()); } - return new ScanBatch(scan, context, oContext, readers.iterator(), partitionColumns, selectedPartitionColumns); + return new ScanBatch(scan, context, oContext, readers.iterator(), implicitColumns); } public abstract RecordWriter getRecordWriter(FragmentContext context, EasyWriter writer) throws IOException; http://git-wip-us.apache.org/repos/asf/drill/blob/3209886a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java index c730bc9..4d4719b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java @@ -18,25 +18,21 @@ package org.apache.drill.exec.store.parquet; import java.io.IOException; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; -import java.util.regex.Matcher; -import java.util.regex.Pattern; +import com.google.common.base.Functions; import com.google.common.base.Stopwatch; -import org.apache.commons.lang3.ArrayUtils; +import com.google.common.collect.Maps; import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.OperatorContext; -import org.apache.drill.exec.physical.base.GroupScan; import org.apache.drill.exec.physical.impl.BatchCreator; import org.apache.drill.exec.physical.impl.ScanBatch; import org.apache.drill.exec.record.RecordBatch; -import org.apache.drill.exec.store.AbstractRecordReader; +import org.apache.drill.exec.store.ImplicitColumnExplorer; import org.apache.drill.exec.store.RecordReader; import org.apache.drill.exec.store.dfs.DrillFileSystem; import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader; @@ -65,33 +61,14 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan public ScanBatch getBatch(FragmentContext context, ParquetRowGroupScan rowGroupScan, List<RecordBatch> children) throws ExecutionSetupException { Preconditions.checkArgument(children.isEmpty()); - String partitionDesignator = context.getOptions() - .getOption(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL).string_val; - List<SchemaPath> columns = rowGroupScan.getColumns(); - List<RecordReader> readers = Lists.newArrayList(); OperatorContext oContext = context.newOperatorContext(rowGroupScan); - List<String[]> partitionColumns = Lists.newArrayList(); - List<Integer> selectedPartitionColumns = Lists.newArrayList(); - boolean selectAllColumns = AbstractRecordReader.isStarQuery(columns); - - List<SchemaPath> newColumns = columns; - if (!selectAllColumns) { - newColumns = Lists.newArrayList(); - Pattern pattern = Pattern.compile(String.format("%s[0-9]+", partitionDesignator)); - for (SchemaPath column : columns) { - Matcher m = pattern.matcher(column.getAsUnescapedPath()); - if (m.matches()) { - selectedPartitionColumns.add(Integer.parseInt(column.getAsUnescapedPath().toString().substring(partitionDesignator.length()))); - } else { - newColumns.add(column); - } - } - final int id = rowGroupScan.getOperatorId(); - // Create the new row group scan with the new columns + final ImplicitColumnExplorer columnExplorer = new ImplicitColumnExplorer(context, rowGroupScan.getColumns()); + + if (!columnExplorer.isSelectAllColumns()) { rowGroupScan = new ParquetRowGroupScan(rowGroupScan.getUserName(), rowGroupScan.getStorageEngine(), - rowGroupScan.getRowGroupReadEntries(), newColumns, rowGroupScan.getSelectionRoot()); - rowGroupScan.setOperatorId(id); + rowGroupScan.getRowGroupReadEntries(), columnExplorer.getTableColumns(), rowGroupScan.getSelectionRoot()); + rowGroupScan.setOperatorId(rowGroupScan.getOperatorId()); } DrillFileSystem fs; @@ -106,8 +83,10 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan conf.setBoolean(ENABLE_TIME_READ_COUNTER, false); // keep footers in a map to avoid re-reading them - Map<String, ParquetMetadata> footers = new HashMap<String, ParquetMetadata>(); - int numParts = 0; + Map<String, ParquetMetadata> footers = Maps.newHashMap(); + List<RecordReader> readers = Lists.newArrayList(); + List<Map<String, String>> implicitColumns = Lists.newArrayList(); + Map<String, String> mapWithMaxColumns = Maps.newLinkedHashMap(); for(RowGroupReadEntry e : rowGroupScan.getRowGroupReadEntries()){ /* Here we could store a map from file names to footers, to prevent re-reading the footer for each row group in a file @@ -118,7 +97,7 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan */ try { Stopwatch timer = Stopwatch.createUnstarted(); - if ( ! footers.containsKey(e.getPath())){ + if (!footers.containsKey(e.getPath())){ timer.start(); ParquetMetadata footer = ParquetFileReader.readFooter(conf, new Path(e.getPath())); long timeToRead = timer.elapsed(TimeUnit.MICROSECONDS); @@ -138,37 +117,27 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan ); } else { ParquetMetadata footer = footers.get(e.getPath()); - readers.add(new DrillParquetReader(context, footer, e, newColumns, fs)); + readers.add(new DrillParquetReader(context, footer, e, columnExplorer.getTableColumns(), fs)); } - if (rowGroupScan.getSelectionRoot() != null) { - String[] r = Path.getPathWithoutSchemeAndAuthority(new Path(rowGroupScan.getSelectionRoot())).toString().split("/"); - String[] p = Path.getPathWithoutSchemeAndAuthority(new Path(e.getPath())).toString().split("/"); - if (p.length > r.length) { - String[] q = ArrayUtils.subarray(p, r.length, p.length - 1); - partitionColumns.add(q); - numParts = Math.max(numParts, q.length); - } else { - partitionColumns.add(new String[] {}); - } - } else { - partitionColumns.add(new String[] {}); + + Map<String, String> implicitValues = columnExplorer.populateImplicitColumns(e, rowGroupScan.getSelectionRoot()); + implicitColumns.add(implicitValues); + if (implicitValues.size() > mapWithMaxColumns.size()) { + mapWithMaxColumns = implicitValues; } + } catch (IOException e1) { throw new ExecutionSetupException(e1); } } - if (selectAllColumns) { - for (int i = 0; i < numParts; i++) { - selectedPartitionColumns.add(i); - } + // all readers should have the same number of implicit columns, add missing ones with value null + Map<String, String> diff = Maps.transformValues(mapWithMaxColumns, Functions.constant((String) null)); + for (Map<String, String> map : implicitColumns) { + map.putAll(Maps.difference(map, diff).entriesOnlyOnRight()); } - ScanBatch s = - new ScanBatch(rowGroupScan, context, oContext, readers.iterator(), partitionColumns, selectedPartitionColumns); - - - return s; + return new ScanBatch(rowGroupScan, context, oContext, readers.iterator(), implicitColumns); } private static boolean isComplex(ParquetMetadata footer) { http://git-wip-us.apache.org/repos/asf/drill/blob/3209886a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestImplicitFileColumns.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestImplicitFileColumns.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestImplicitFileColumns.java new file mode 100644 index 0000000..6900da9 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestImplicitFileColumns.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store; + +import com.google.common.base.Charsets; +import com.google.common.io.Files; +import org.apache.drill.BaseTestQuery; +import org.apache.drill.exec.util.JsonStringArrayList; +import org.apache.drill.exec.util.Text; +import org.apache.hadoop.fs.Path; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; + +public class TestImplicitFileColumns extends BaseTestQuery { + + public static final String MAIN = "main"; + public static final String NESTED = "nested"; + public static final String CSV = "csv"; + + private static final JsonStringArrayList<Text> mainColumnValues = new JsonStringArrayList<Text>() {{ + add(new Text(MAIN)); + }}; + private static final JsonStringArrayList<Text> nestedColumnValues = new JsonStringArrayList<Text>() {{ + add(new Text(NESTED)); + }}; + + @Rule + public TemporaryFolder testFolder = new TemporaryFolder(); + + private File mainFile; + private File nestedFolder; + private File nestedFile; + + @Before + public void setup() throws Exception { + mainFile = testFolder.newFile(MAIN + "." + CSV); + Files.write(MAIN, mainFile, Charsets.UTF_8); + nestedFolder = testFolder.newFolder(NESTED); + nestedFile = new File(nestedFolder, NESTED + "." + CSV); + Files.write(NESTED, nestedFile, Charsets.UTF_8); + } + + @Test + public void testImplicitColumns() throws Exception { + testBuilder() + .sqlQuery("select *, filename, suffix, fqn, filepath from dfs.`" + testFolder.getRoot().getPath() + "` order by filename") + .ordered() + .baselineColumns("columns", "dir0", "filename", "suffix", "fqn", "filepath") + .baselineValues(mainColumnValues, null, mainFile.getName(), CSV, new Path(mainFile.getPath()).toString(), new Path(mainFile.getParent()).toString()) + .baselineValues(nestedColumnValues, NESTED, nestedFile.getName(), CSV, new Path(nestedFile.getPath()).toString(), new Path(nestedFile.getParent()).toString()) + .go(); + } + + @Test + public void testImplicitColumnInWhereClause() throws Exception { + testBuilder() + .sqlQuery("select * from dfs.`%s` where filename = '%s'", nestedFolder.getPath(), nestedFile.getName()) + .unOrdered() + .baselineColumns("columns") + .baselineValues(nestedColumnValues) + .go(); + } + + @Test + public void testImplicitColumnAlone() throws Exception { + testBuilder() + .sqlQuery("select filename from dfs.`" + nestedFolder.getPath() + "`") + .unOrdered() + .baselineColumns("filename") + .baselineValues(nestedFile.getName()) + .go(); + } + + @Test + public void testImplicitColumnWithTableColumns() throws Exception { + testBuilder() + .sqlQuery("select columns, filename from dfs.`" + nestedFolder.getPath() + "`") + .unOrdered() + .baselineColumns("columns", "filename") + .baselineValues(nestedColumnValues, nestedFile.getName()) + .go(); + } + + @Test + public void testImplicitColumnsForParquet() throws Exception { + testBuilder() + .sqlQuery("select filename, suffix from cp.`tpch/region.parquet` limit 1") + .unOrdered() + .baselineColumns("filename", "suffix") + .baselineValues("region.parquet", "parquet") + .go(); + } + +}
