This is an automated email from the ASF dual-hosted git repository. tmarshall pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 3554b0752d4be142dabb82b6b751323cde5323d8 Author: Steve Carlin <[email protected]> AuthorDate: Sat Feb 20 07:10:43 2021 -0800 IMPALA-10524: Changes to HdfsPartition for third party extensions. Some changes are needed to HdfsPartition and other related classes to allow for third party extensions. These changes include: - A protected constructor which will allow a subclass to instantiate HdfsPartition using its own Builder. - Various changes of permissions to methods and variables to allow third party extension visibility. - Creation of the getHostIndex() method to allow the subclass to override how the hostIndexes are retrieved. - Added a new default method "getFileSystem()" to FeFsPartition which will allow the third party extension to override how the filesystem is obtained from the partition object. Change-Id: I5a792642f27228118ac8f2e8ef98e8ba7aee4a46 Reviewed-on: http://gerrit.cloudera.org:8080/17092 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- .../org/apache/impala/analysis/AnalyticWindow.java | 5 ++-- .../org/apache/impala/analysis/StatementBase.java | 5 ++-- .../org/apache/impala/catalog/FeFsPartition.java | 17 ++++++++++++ .../org/apache/impala/catalog/HdfsPartition.java | 27 +++++++++++++++++- .../catalog/HdfsPartitionLocationCompressor.java | 2 +- .../impala/catalog/local/LocalFsPartition.java | 5 ++++ .../apache/impala/planner/AnalyticEvalNode.java | 2 +- .../impala/planner/CardinalityCheckNode.java | 2 +- .../org/apache/impala/planner/HdfsScanNode.java | 2 +- .../org/apache/impala/planner/KuduScanNode.java | 2 +- .../java/org/apache/impala/planner/PlanNode.java | 2 +- .../java/org/apache/impala/planner/Planner.java | 32 ++++++++++++++-------- .../java/org/apache/impala/planner/SortNode.java | 2 +- .../org/apache/impala/planner/SubplanNode.java | 2 +- 14 files changed, 82 insertions(+), 25 deletions(-) diff --git a/fe/src/main/java/org/apache/impala/analysis/AnalyticWindow.java b/fe/src/main/java/org/apache/impala/analysis/AnalyticWindow.java index 7b460f9..48b130c 100644 --- a/fe/src/main/java/org/apache/impala/analysis/AnalyticWindow.java +++ b/fe/src/main/java/org/apache/impala/analysis/AnalyticWindow.java @@ -42,7 +42,7 @@ public class AnalyticWindow { new Boundary(BoundaryType.UNBOUNDED_PRECEDING, null), new Boundary(BoundaryType.CURRENT_ROW, null)); - enum Type { + public enum Type { ROWS("ROWS"), RANGE("RANGE"); @@ -131,8 +131,7 @@ public class AnalyticWindow { this(type, e, null); } - // c'tor used by clone() - private Boundary(BoundaryType type, Expr e, BigDecimal offsetValue) { + public Boundary(BoundaryType type, Expr e, BigDecimal offsetValue) { Preconditions.checkState( (type.isOffset() && e != null) || (!type.isOffset() && e == null)); diff --git a/fe/src/main/java/org/apache/impala/analysis/StatementBase.java b/fe/src/main/java/org/apache/impala/analysis/StatementBase.java index e0fb2de..4fb2d43 100644 --- a/fe/src/main/java/org/apache/impala/analysis/StatementBase.java +++ b/fe/src/main/java/org/apache/impala/analysis/StatementBase.java @@ -200,8 +200,9 @@ public abstract class StatementBase extends StmtNode { * If strictDecimal is true, only consider casts that result in no loss of information * when casting between decimal types. */ - protected Expr checkTypeCompatibility(String dstTableName, Column dstCol, Expr srcExpr, - boolean strictDecimal, Expr widestTypeSrcExpr) throws AnalysisException { + public static Expr checkTypeCompatibility(String dstTableName, Column dstCol, + Expr srcExpr, boolean strictDecimal, Expr widestTypeSrcExpr) + throws AnalysisException { Type dstColType = dstCol.getType(); Type srcExprType = srcExpr.getType(); diff --git a/fe/src/main/java/org/apache/impala/catalog/FeFsPartition.java b/fe/src/main/java/org/apache/impala/catalog/FeFsPartition.java index 4b66148..8ec49bb 100644 --- a/fe/src/main/java/org/apache/impala/catalog/FeFsPartition.java +++ b/fe/src/main/java/org/apache/impala/catalog/FeFsPartition.java @@ -16,18 +16,23 @@ // under the License. package org.apache.impala.catalog; +import java.io.IOException; import java.util.List; import java.util.Map; import javax.annotation.Nullable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.impala.analysis.LiteralExpr; import org.apache.impala.catalog.HdfsPartition.FileDescriptor; import org.apache.impala.common.FileSystemUtil; import org.apache.impala.thrift.TAccessLevel; import org.apache.impala.thrift.THdfsPartitionLocation; +import org.apache.impala.thrift.TNetworkAddress; import org.apache.impala.thrift.TPartitionStats; +import org.apache.impala.util.ListMap; /** * Frontend interface for interacting with a single filesystem-based partition. @@ -50,6 +55,11 @@ public interface FeFsPartition { FeFsTable getTable(); /** + * @return ListMap<hostIndex> from partition's table. + */ + ListMap<TNetworkAddress> getHostIndex(); + + /** * @return the FsType that this partition is stored on */ FileSystemUtil.FsType getFsType(); @@ -95,6 +105,13 @@ public interface FeFsPartition { Path getLocationPath(); /** + * @return the FileSystem of this partition + */ + default FileSystem getFileSystem(Configuration conf) throws IOException { + return getLocationPath().getFileSystem(conf); + } + + /** * @return the HDFS permissions Impala has to this partition's directory - READ_ONLY, * READ_WRITE, etc. */ diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java index 3d3b0c3..28f1f3d 100644 --- a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java +++ b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java @@ -682,7 +682,29 @@ public class HdfsPartition extends CatalogObjectImpl // it's not used in coordinators. private final InFlightEvents inFlightEvents_; - private HdfsPartition(HdfsTable table, long id, long prevId, String partName, + /** + * Constructor. Needed for third party extensions that want to use their own builder + * to construct the object. + */ + protected HdfsPartition(HdfsTable table, long prevId, String partName, + List<LiteralExpr> partitionKeyValues, HdfsStorageDescriptor fileFormatDescriptor, + @Nonnull ImmutableList<byte[]> encodedFileDescriptors, + ImmutableList<byte[]> encodedInsertFileDescriptors, + ImmutableList<byte[]> encodedDeleteFileDescriptors, + HdfsPartitionLocationCompressor.Location location, + boolean isMarkedCached, TAccessLevel accessLevel, Map<String, String> hmsParameters, + CachedHmsPartitionDescriptor cachedMsPartitionDescriptor, + byte[] partitionStats, boolean hasIncrementalStats, long numRows, long writeId, + InFlightEvents inFlightEvents) { + this(table, partitionIdCounter_.getAndIncrement(), prevId, partName, + partitionKeyValues, fileFormatDescriptor, encodedFileDescriptors, + encodedInsertFileDescriptors, encodedDeleteFileDescriptors, location, + isMarkedCached, accessLevel, hmsParameters, cachedMsPartitionDescriptor, + partitionStats, hasIncrementalStats, numRows, writeId, inFlightEvents); + } + + + protected HdfsPartition(HdfsTable table, long id, long prevId, String partName, List<LiteralExpr> partitionKeyValues, HdfsStorageDescriptor fileFormatDescriptor, @Nonnull ImmutableList<byte[]> encodedFileDescriptors, ImmutableList<byte[]> encodedInsertFileDescriptors, @@ -786,6 +808,9 @@ public class HdfsPartition extends CatalogObjectImpl @Override // FeFsPartition public HdfsTable getTable() { return table_; } + @Override // FeFsPartition + public ListMap<TNetworkAddress> getHostIndex() { return table_.getHostIndex(); } + @Override public FileSystemUtil.FsType getFsType() { Preconditions.checkNotNull(getLocationPath().toUri().getScheme(), diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsPartitionLocationCompressor.java b/fe/src/main/java/org/apache/impala/catalog/HdfsPartitionLocationCompressor.java index 939bcf6..2ba6414 100644 --- a/fe/src/main/java/org/apache/impala/catalog/HdfsPartitionLocationCompressor.java +++ b/fe/src/main/java/org/apache/impala/catalog/HdfsPartitionLocationCompressor.java @@ -35,7 +35,7 @@ import com.google.common.base.Preconditions; * prefixes, like table locations. * */ -class HdfsPartitionLocationCompressor { +public class HdfsPartitionLocationCompressor { int numClusteringColumns_; // A bi-directional map between partition location prefixes and their compressed diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsPartition.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsPartition.java index 1bed832..575e01a 100644 --- a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsPartition.java +++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsPartition.java @@ -39,7 +39,9 @@ import org.apache.impala.common.FileSystemUtil; import org.apache.impala.compat.MetastoreShim; import org.apache.impala.thrift.TAccessLevel; import org.apache.impala.thrift.THdfsPartitionLocation; +import org.apache.impala.thrift.TNetworkAddress; import org.apache.impala.thrift.TPartitionStats; +import org.apache.impala.util.ListMap; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; @@ -104,6 +106,9 @@ public class LocalFsPartition implements FeFsPartition { return table_; } + @Override // FeFsPartition + public ListMap<TNetworkAddress> getHostIndex() { return table_.getHostIndex(); } + @Override public FileSystemUtil.FsType getFsType() { Preconditions.checkNotNull(getLocationPath().toUri().getScheme(), diff --git a/fe/src/main/java/org/apache/impala/planner/AnalyticEvalNode.java b/fe/src/main/java/org/apache/impala/planner/AnalyticEvalNode.java index 31463cf..252df3b 100644 --- a/fe/src/main/java/org/apache/impala/planner/AnalyticEvalNode.java +++ b/fe/src/main/java/org/apache/impala/planner/AnalyticEvalNode.java @@ -251,7 +251,7 @@ public class AnalyticEvalNode extends PlanNode { } @Override - protected void computeStats(Analyzer analyzer) { + public void computeStats(Analyzer analyzer) { super.computeStats(analyzer); cardinality_ = getChild(0).cardinality_; cardinality_ = capCardinalityAtLimit(cardinality_); diff --git a/fe/src/main/java/org/apache/impala/planner/CardinalityCheckNode.java b/fe/src/main/java/org/apache/impala/planner/CardinalityCheckNode.java index f7629a1..00cd67f 100644 --- a/fe/src/main/java/org/apache/impala/planner/CardinalityCheckNode.java +++ b/fe/src/main/java/org/apache/impala/planner/CardinalityCheckNode.java @@ -40,7 +40,7 @@ import com.google.common.base.Preconditions; public class CardinalityCheckNode extends PlanNode { private final String displayStatement_; - protected CardinalityCheckNode(PlanNodeId id, PlanNode child, String displayStmt) { + public CardinalityCheckNode(PlanNodeId id, PlanNode child, String displayStmt) { super(id, "CARDINALITY CHECK"); Preconditions.checkState(child.getLimit() <= 2); cardinality_ = 1; diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java index bbff229..f22b835 100644 --- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java @@ -1194,7 +1194,7 @@ public class HdfsScanNode extends ScanNode { // Translate from the host index (local to the HdfsTable) to network address. int replicaHostIdx = FileBlock.getReplicaHostIdx(block, j); TNetworkAddress networkAddress = - partition.getTable().getHostIndex().getEntry(replicaHostIdx); + partition.getHostIndex().getEntry(replicaHostIdx); Preconditions.checkNotNull(networkAddress); // Translate from network address to the global (to this request) host index. Integer globalHostIdx = analyzer.getHostIndex().getIndex(networkAddress); diff --git a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java index e4a2d0c..7e2d520 100644 --- a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java @@ -352,7 +352,7 @@ public class KuduScanNode extends ScanNode { } @Override - protected void computeStats(Analyzer analyzer) { + public void computeStats(Analyzer analyzer) { super.computeStats(analyzer); computeNumNodes(analyzer); diff --git a/fe/src/main/java/org/apache/impala/planner/PlanNode.java b/fe/src/main/java/org/apache/impala/planner/PlanNode.java index a91cb4d..52afb59 100644 --- a/fe/src/main/java/org/apache/impala/planner/PlanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/PlanNode.java @@ -590,7 +590,7 @@ abstract public class PlanNode extends TreeNode<PlanNode> { * from init() (to facilitate inserting additional nodes during plan * partitioning w/o the need to call init() recursively on the whole tree again). */ - protected void computeStats(Analyzer analyzer) { + public void computeStats(Analyzer analyzer) { avgRowSize_ = 0.0F; for (TupleId tid: tupleIds_) { TupleDescriptor desc = analyzer.getTupleDesc(tid); diff --git a/fe/src/main/java/org/apache/impala/planner/Planner.java b/fe/src/main/java/org/apache/impala/planner/Planner.java index e769fb0..bd4acb5 100644 --- a/fe/src/main/java/org/apache/impala/planner/Planner.java +++ b/fe/src/main/java/org/apache/impala/planner/Planner.java @@ -642,31 +642,37 @@ public class Planner { return newJoinNode; } - private void checkForSmallQueryOptimization(PlanNode singleNodePlan) { + public static void checkForSmallQueryOptimization(PlanNode singleNodePlan, + PlannerContext ctx) { MaxRowsProcessedVisitor visitor = new MaxRowsProcessedVisitor(); singleNodePlan.accept(visitor); if (!visitor.valid()) return; // This optimization executes the plan on a single node so the threshold must // be based on the total number of rows processed. long maxRowsProcessed = visitor.getMaxRowsProcessed(); - int threshold = ctx_.getQueryOptions().exec_single_node_rows_threshold; + int threshold = ctx.getQueryOptions().exec_single_node_rows_threshold; if (maxRowsProcessed < threshold) { // Execute on a single node and disable codegen for small results LOG.trace("Query is small enough to execute on a single node: maxRowsProcessed = " + maxRowsProcessed); - ctx_.getQueryOptions().setNum_nodes(1); - ctx_.getQueryCtx().disable_codegen_hint = true; - if (maxRowsProcessed < ctx_.getQueryOptions().batch_size || - maxRowsProcessed < 1024 && ctx_.getQueryOptions().batch_size == 0) { + ctx.getQueryOptions().setNum_nodes(1); + ctx.getQueryCtx().disable_codegen_hint = true; + if (maxRowsProcessed < ctx.getQueryOptions().batch_size || + maxRowsProcessed < 1024 && ctx.getQueryOptions().batch_size == 0) { // Only one scanner thread for small queries - ctx_.getQueryOptions().setNum_scanner_threads(1); + ctx.getQueryOptions().setNum_scanner_threads(1); } // disable runtime filters - ctx_.getQueryOptions().setRuntime_filter_mode(TRuntimeFilterMode.OFF); + ctx.getQueryOptions().setRuntime_filter_mode(TRuntimeFilterMode.OFF); } } - private void checkForDisableCodegen(PlanNode distributedPlan) { + private void checkForSmallQueryOptimization(PlanNode singleNodePlan) { + checkForSmallQueryOptimization(singleNodePlan, ctx_); + } + + public static void checkForDisableCodegen(PlanNode distributedPlan, + PlannerContext ctx) { MaxRowsProcessedVisitor visitor = new MaxRowsProcessedVisitor(); distributedPlan.accept(visitor); if (!visitor.valid()) return; @@ -674,11 +680,15 @@ public class Planner { // reduce per-node execution time enough to justify the cost of codegen. Per-node // execution time is correlated with the number of rows flowing through the plan. if (visitor.getMaxRowsProcessedPerNode() - < ctx_.getQueryOptions().getDisable_codegen_rows_threshold()) { - ctx_.getQueryCtx().disable_codegen_hint = true; + < ctx.getQueryOptions().getDisable_codegen_rows_threshold()) { + ctx.getQueryCtx().disable_codegen_hint = true; } } + private void checkForDisableCodegen(PlanNode distributedPlan) { + checkForDisableCodegen(distributedPlan, ctx_); + } + /** * Insert a sort node on top of the plan, depending on the clustered/noclustered * plan hint and on the 'sort.columns' table property. If clustering is enabled in diff --git a/fe/src/main/java/org/apache/impala/planner/SortNode.java b/fe/src/main/java/org/apache/impala/planner/SortNode.java index 79913ed..225c372 100644 --- a/fe/src/main/java/org/apache/impala/planner/SortNode.java +++ b/fe/src/main/java/org/apache/impala/planner/SortNode.java @@ -293,7 +293,7 @@ public class SortNode extends PlanNode { } @Override - protected void computeStats(Analyzer analyzer) { + public void computeStats(Analyzer analyzer) { super.computeStats(analyzer); if (isTypeTopN() && includeTies_) { cardinality_ = capCardinalityAtLimit(getChild(0).cardinality_, limitWithTies_); diff --git a/fe/src/main/java/org/apache/impala/planner/SubplanNode.java b/fe/src/main/java/org/apache/impala/planner/SubplanNode.java index f964c6d..eb57df6 100644 --- a/fe/src/main/java/org/apache/impala/planner/SubplanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/SubplanNode.java @@ -85,7 +85,7 @@ public class SubplanNode extends PlanNode { } @Override - protected void computeStats(Analyzer analyzer) { + public void computeStats(Analyzer analyzer) { super.computeStats(analyzer); if (getChild(0).cardinality_ != -1 && getChild(1).cardinality_ != -1) { cardinality_ =
