[#140722851] Applied code-review feedback.
Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/478f0a89 Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/478f0a89 Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/478f0a89 Branch: refs/heads/140722851 Commit: 478f0a89163554a835bf3963b4a7e2df3341fe34 Parents: 73a2048 Author: Oleksandr Diachenko <[email protected]> Authored: Mon Mar 27 17:11:47 2017 -0700 Committer: Oleksandr Diachenko <[email protected]> Committed: Mon Mar 27 17:11:47 2017 -0700 ---------------------------------------------------------------------- .../org/apache/hawq/pxf/api/StatsAccessor.java | 40 ++++++++++++++++++ .../pxf/api/utilities/EnumAggregationType.java | 2 +- .../pxf/api/utilities/FragmentMetadata.java | 17 +++++++- .../hawq/pxf/api/utilities/Utilities.java | 44 ++++++++++++++++++-- .../hawq/pxf/plugins/hive/HiveORCAccessor.java | 42 ++++++++++--------- .../org/apache/hawq/pxf/service/AggBridge.java | 5 +++ .../pxf/service/utilities/ProtocolData.java | 1 + 7 files changed, 127 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/478f0a89/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/StatsAccessor.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/StatsAccessor.java b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/StatsAccessor.java new file mode 100644 index 0000000..724448b --- /dev/null +++ b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/StatsAccessor.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hawq.pxf.api; + +import org.apache.hawq.pxf.api.OneRow; + +/** + * Interface of accessor which can leverage statistic information for aggregate queries + * + */ +public interface StatsAccessor { + + /** + * Method which reads needed statistics for current split + */ + public void retrieveStats() throws Exception; + + /** + * Returns next tuple based on statistics information without actual reading of data + */ + public OneRow emitAggObject(); + +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/478f0a89/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/EnumAggregationType.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/EnumAggregationType.java b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/EnumAggregationType.java index ee38f18..69716c6 100644 --- a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/EnumAggregationType.java +++ b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/EnumAggregationType.java @@ -27,7 +27,7 @@ public enum EnumAggregationType { private String aggOperationCode; private boolean optimizationSupported; - EnumAggregationType(String aggOperationCode, boolean optimizationSupported) { + private EnumAggregationType(String aggOperationCode, boolean optimizationSupported) { this.aggOperationCode = aggOperationCode; this.optimizationSupported = optimizationSupported; } http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/478f0a89/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/FragmentMetadata.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/FragmentMetadata.java b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/FragmentMetadata.java index 7a82065..7f266ec 100644 --- a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/FragmentMetadata.java +++ b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/FragmentMetadata.java @@ -19,9 +19,12 @@ package org.apache.hawq.pxf.api.utilities; +/** + * Class which holds metadata of a file split and locality information. + * + */ public class FragmentMetadata { - private long start; private long end; private String[] hosts; @@ -32,6 +35,10 @@ public class FragmentMetadata { this.hosts = hosts; } + /** + * + * @return position in bytes where given data fragment starts + */ public long getStart() { return start; } @@ -40,6 +47,10 @@ public class FragmentMetadata { this.start = start; } + /** + * + * @return position in bytes where given data fragment ends + */ public long getEnd() { return end; } @@ -48,6 +59,10 @@ public class FragmentMetadata { this.end = end; } + /** + * + * @return all hosts which has given data fragment + */ public String[] getHosts() { return hosts; } http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/478f0a89/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/Utilities.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/Utilities.java b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/Utilities.java index f59a07a..f948888 100644 --- a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/Utilities.java +++ b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/Utilities.java @@ -23,6 +23,8 @@ import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.ArrayUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hawq.pxf.api.ReadAccessor; +import org.apache.hawq.pxf.api.StatsAccessor; import java.io.ByteArrayInputStream; import java.io.IOException; @@ -156,6 +158,14 @@ public class Utilities { return input.replaceAll("[^a-zA-Z0-9_:/-]", "."); } + /** + * Parses input data and returns fragment metadata. + * + * @param inputData input data which has protocol information + * @return fragment metadata + * @throws IllegalArgumentException if fragment metadata information wasn't found in input data + * @throws Exception + */ public static FragmentMetadata parseFragmentMetadata(InputData inputData) throws Exception { byte[] serializedLocation = inputData.getFragmentMetadata(); if (serializedLocation == null) { @@ -166,9 +176,18 @@ public class Utilities { long start = objectStream.readLong(); long end = objectStream.readLong(); String[] hosts = (String[]) objectStream.readObject(); - LOG.debug("parsed file split: path " + inputData.getDataSource() - + ", start " + start + ", end " + end + ", hosts " - + ArrayUtils.toString(hosts)); + if (LOG.isDebugEnabled()) { + StringBuilder sb = new StringBuilder(); + sb.append("parsed file split: path "); + sb.append(inputData.getDataSource()); + sb.append(", start "); + sb.append(start); + sb.append(", end "); + sb.append(end); + sb.append(", hosts "); + sb.append(ArrayUtils.toString(hosts)); + LOG.debug(sb.toString()); + } FragmentMetadata fragmentMetadata = new FragmentMetadata(start, end, hosts); return fragmentMetadata; } catch (Exception e) { @@ -176,4 +195,23 @@ public class Utilities { throw e; } } + + + /** + * Determines whether accessor should use statistics to optimize reading results + * + * @param accessor accessor instance + * @param inputData input data which has protocol information + * @return true if this accessor should use statistic information + */ + public static boolean useStats(ReadAccessor accessor, InputData inputData) { + if (accessor instanceof StatsAccessor) { + if (inputData != null && !inputData.hasFilter() + && inputData.getAggType() != null + && inputData.getAggType().isOptimizationSupported()) { + return true; + } + } + return false; + } } http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/478f0a89/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java index 0e2fc2a..24b355c 100644 --- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java +++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java @@ -22,7 +22,6 @@ package org.apache.hawq.pxf.plugins.hive; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hive.ql.io.orc.ColumnStatistics; import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; import org.apache.hadoop.hive.ql.io.orc.Reader; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; @@ -31,16 +30,14 @@ import org.apache.hadoop.hive.serde2.io.DateWritable; import org.apache.hawq.pxf.api.BasicFilter; import org.apache.hawq.pxf.api.LogicalFilter; import org.apache.hawq.pxf.api.OneRow; +import org.apache.hawq.pxf.api.StatsAccessor; import org.apache.hawq.pxf.api.utilities.ColumnDescriptor; import org.apache.hawq.pxf.api.utilities.EnumAggregationType; import org.apache.hawq.pxf.api.utilities.FragmentMetadata; import org.apache.hawq.pxf.api.utilities.InputData; import org.apache.hawq.pxf.api.utilities.Utilities; -import org.apache.hawq.pxf.plugins.hdfs.utilities.HdfsUtilities; import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities; import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.hive.ql.io.orc.ColumnStatistics; -import org.apache.hadoop.hive.ql.io.orc.StripeInformation; import org.apache.hadoop.mapred.*; import java.io.IOException; @@ -56,7 +53,7 @@ import static org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities.PXF_HIVE_ * This class replaces the generic HiveAccessor for a case where a table is stored entirely as ORC files. * Use together with {@link HiveInputFormatFragmenter}/{@link HiveColumnarSerdeResolver} */ -public class HiveORCAccessor extends HiveAccessor { +public class HiveORCAccessor extends HiveAccessor implements StatsAccessor { private static final Log LOG = LogFactory.getLog(HiveORCAccessor.class); @@ -81,18 +78,7 @@ public class HiveORCAccessor extends HiveAccessor { HiveUserData hiveUserData = HiveUtilities.parseHiveUserData(input, PXF_HIVE_SERDES.ORC_SERDE); initPartitionFields(hiveUserData.getPartitionKeys()); filterInFragmenter = hiveUserData.isFilterInFragmenter(); - - if (inputData != null && !inputData.hasFilter() && inputData.getAggType() != null && inputData.getAggType().isOptimizationSupported()) { - useStats = true; - } - } - - private void retrieveStats() throws Exception { - FragmentMetadata fragmentMetadata = Utilities.parseFragmentMetadata(inputData); - /* We are using file-level stats therefore if file has multiple splits, - * it's enough to return count for a first split in file*/ - if (fragmentMetadata.getStart() == 0) - this.count = this.orcReader.getNumberOfRows(); + useStats = Utilities.useStats(this, inputData); } @Override @@ -104,7 +90,7 @@ public class HiveORCAccessor extends HiveAccessor { } retrieveStats(); objectsEmitted = 0; - return true; + return super.openForRead(); } else { addColumns(); addFilters(); @@ -259,7 +245,25 @@ public class HiveORCAccessor extends HiveAccessor { return super.readNextObject(); } - private OneRow emitAggObject() { + /** + * Fetches file-level statistics from an ORC file. + */ + @Override + public void retrieveStats() throws Exception { + FragmentMetadata fragmentMetadata = Utilities.parseFragmentMetadata(inputData); + /* + * We are using file-level stats therefore if file has multiple splits, + * it's enough to return count for a first split in file + */ + if (fragmentMetadata.getStart() == 0) + this.count = this.orcReader.getNumberOfRows(); + } + + /** + * Emits tuple without reading from disk, currently supports COUNT + */ + @Override + public OneRow emitAggObject() { OneRow row = null; switch (inputData.getAggType()) { case COUNT: http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/478f0a89/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/AggBridge.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/AggBridge.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/AggBridge.java index d274864..12f44e2 100644 --- a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/AggBridge.java +++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/AggBridge.java @@ -32,9 +32,14 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.commons.collections.map.LRUMap; +/** + * Bridge class optimized for aggregate queries. + * + */ public class AggBridge extends ReadBridge implements Bridge { private static final Log LOG = LogFactory.getLog(AggBridge.class); + /* Avoid resolving rows with the same key twice */ private LRUMap resolvedFieldsCache; public AggBridge(ProtocolData protData) throws Exception { http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/478f0a89/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java index 6f21068..0de356b 100644 --- a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java +++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java @@ -117,6 +117,7 @@ public class ProtocolData extends InputData { // Store alignment for global use as a system property System.setProperty("greenplum.alignment", getProperty("ALIGNMENT")); + //Get aggregation operation String aggTypeOperationName = getOptionalProperty("AGG-TYPE"); this.setAggType(EnumAggregationType.getAggregationType(aggTypeOperationName));
