Repository: incubator-hawq Updated Branches: refs/heads/140722851 [created] 478f0a891
[#140722851] PXF to use file level stats for count(*) without condition. Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/73a20485 Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/73a20485 Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/73a20485 Branch: refs/heads/140722851 Commit: 73a204854f6400d31f5b26237ed8df9dfb087d91 Parents: 55d9e85 Author: Oleksandr Diachenko <[email protected]> Authored: Wed Mar 8 13:45:13 2017 -0800 Committer: Oleksandr Diachenko <[email protected]> Committed: Fri Mar 17 15:18:25 2017 -0700 ---------------------------------------------------------------------- .../pxf/api/utilities/EnumAggregationType.java | 51 +++++++++++ .../pxf/api/utilities/FragmentMetadata.java | 59 +++++++++++++ .../hawq/pxf/api/utilities/InputData.java | 9 ++ .../hawq/pxf/api/utilities/Utilities.java | 25 ++++++ .../plugins/hdfs/HdfsAtomicDataAccessor.java | 2 +- .../hdfs/HdfsSplittableDataAccessor.java | 2 +- .../plugins/hdfs/utilities/HdfsUtilities.java | 26 +----- .../hawq/pxf/plugins/hive/HiveORCAccessor.java | 69 ++++++++++++++- .../plugins/hive/utilities/HiveUtilities.java | 23 +++++ pxf/pxf-service/src/main/.DS_Store | Bin 0 -> 6148 bytes .../org/apache/hawq/pxf/service/AggBridge.java | 86 +++++++++++++++++++ .../hawq/pxf/service/rest/BridgeResource.java | 4 +- .../pxf/service/utilities/ProtocolData.java | 5 ++ 13 files changed, 333 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/73a20485/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/EnumAggregationType.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/EnumAggregationType.java b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/EnumAggregationType.java new file mode 100644 index 0000000..ee38f18 --- /dev/null +++ b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/EnumAggregationType.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hawq.pxf.api.utilities; + +public enum EnumAggregationType { + + COUNT("count", true), + UNKNOWN("unknown", false); + + private String aggOperationCode; + private boolean optimizationSupported; + + EnumAggregationType(String aggOperationCode, boolean optimizationSupported) { + this.aggOperationCode = aggOperationCode; + this.optimizationSupported = optimizationSupported; + } + + public String getAggOperationCode() { + return this.aggOperationCode; + } + + public boolean isOptimizationSupported() { + return this.optimizationSupported; + } + + public static EnumAggregationType getAggregationType(String aggOperationCode) { + for (EnumAggregationType at : values()) { + if (at.getAggOperationCode().equals(aggOperationCode)) { + return at; + } + } + return EnumAggregationType.UNKNOWN; + } +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/73a20485/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/FragmentMetadata.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/FragmentMetadata.java b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/FragmentMetadata.java new file mode 100644 index 0000000..7a82065 --- /dev/null +++ b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/FragmentMetadata.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hawq.pxf.api.utilities; + +public class FragmentMetadata { + + + private long start; + private long end; + private String[] hosts; + + public FragmentMetadata(long start, long end, String[] hosts) { + this.start = start; + this.end = end; + this.hosts = hosts; + } + + public long getStart() { + return start; + } + + public void setStart(long start) { + this.start = start; + } + + public long getEnd() { + return end; + } + + public void setEnd(long end) { + this.end = end; + } + + public String[] getHosts() { + return hosts; + } + + public void setHosts(String[] hosts) { + this.hosts = hosts; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/73a20485/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/InputData.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/InputData.java b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/InputData.java index 9816fdc..7950ed3 100644 --- a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/InputData.java +++ b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/InputData.java @@ -52,6 +52,7 @@ public class InputData { protected String remoteLogin; protected String remoteSecret; protected int dataFragment; /* should be deprecated */ + private EnumAggregationType aggType; /** * When false the bridge has to run in synchronized mode. default value - @@ -335,4 +336,12 @@ public class InputData { return dataFragment; } + public EnumAggregationType getAggType() { + return aggType; + } + + public void setAggType(EnumAggregationType aggType) { + this.aggType = aggType; + } + } http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/73a20485/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/Utilities.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/Utilities.java b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/Utilities.java index 51326bc..f59a07a 100644 --- a/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/Utilities.java +++ b/pxf/pxf-api/src/main/java/org/apache/hawq/pxf/api/utilities/Utilities.java @@ -20,9 +20,13 @@ package org.apache.hawq.pxf.api.utilities; */ import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang.ArrayUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.ObjectInputStream; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; @@ -151,4 +155,25 @@ public class Utilities { } return input.replaceAll("[^a-zA-Z0-9_:/-]", "."); } + + public static FragmentMetadata parseFragmentMetadata(InputData inputData) throws Exception { + byte[] serializedLocation = inputData.getFragmentMetadata(); + if (serializedLocation == null) { + throw new IllegalArgumentException("Missing fragment location information"); + } + try (ByteArrayInputStream bytesStream = new ByteArrayInputStream(serializedLocation); + ObjectInputStream objectStream = new ObjectInputStream(bytesStream)) { + long start = objectStream.readLong(); + long end = objectStream.readLong(); + String[] hosts = (String[]) objectStream.readObject(); + LOG.debug("parsed file split: path " + inputData.getDataSource() + + ", start " + start + ", end " + end + ", hosts " + + ArrayUtils.toString(hosts)); + FragmentMetadata fragmentMetadata = new FragmentMetadata(start, end, hosts); + return fragmentMetadata; + } catch (Exception e) { + LOG.error("Unable to parse fragment metadata"); + throw e; + } + } } http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/73a20485/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsAtomicDataAccessor.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsAtomicDataAccessor.java b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsAtomicDataAccessor.java index 178b774..a95248e 100644 --- a/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsAtomicDataAccessor.java +++ b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsAtomicDataAccessor.java @@ -65,7 +65,7 @@ public abstract class HdfsAtomicDataAccessor extends Plugin implements ReadAcces // 1. Load Hadoop configuration defined in $HADOOP_HOME/conf/*.xml files conf = new Configuration(); - fileSplit = HdfsUtilities.parseFragmentMetadata(inputData); + fileSplit = HdfsUtilities.parseFileSplit(inputData); } /** http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/73a20485/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsSplittableDataAccessor.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsSplittableDataAccessor.java b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsSplittableDataAccessor.java index 0174bd8..b61d76a 100644 --- a/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsSplittableDataAccessor.java +++ b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/HdfsSplittableDataAccessor.java @@ -75,7 +75,7 @@ public abstract class HdfsSplittableDataAccessor extends Plugin implements @Override public boolean openForRead() throws Exception { LinkedList<InputSplit> requestSplits = new LinkedList<InputSplit>(); - FileSplit fileSplit = HdfsUtilities.parseFragmentMetadata(inputData); + FileSplit fileSplit = HdfsUtilities.parseFileSplit(inputData); requestSplits.add(fileSplit); // Initialize record reader based on current split http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/73a20485/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/utilities/HdfsUtilities.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/utilities/HdfsUtilities.java b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/utilities/HdfsUtilities.java index c99ccd6..1aae838 100644 --- a/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/utilities/HdfsUtilities.java +++ b/pxf/pxf-hdfs/src/main/java/org/apache/hawq/pxf/plugins/hdfs/utilities/HdfsUtilities.java @@ -22,6 +22,7 @@ package org.apache.hawq.pxf.plugins.hdfs.utilities; import org.apache.hawq.pxf.api.io.DataType; import org.apache.hawq.pxf.api.OneField; +import org.apache.hawq.pxf.api.utilities.FragmentMetadata; import org.apache.hawq.pxf.api.utilities.InputData; import org.apache.hawq.pxf.api.utilities.Utilities; import org.apache.avro.Schema; @@ -30,7 +31,6 @@ import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DatumReader; import org.apache.avro.mapred.FsInput; -import org.apache.commons.lang.ArrayUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -171,29 +171,11 @@ public class HdfsUtilities { * @param inputData request input data * @return FileSplit with fragment metadata */ - public static FileSplit parseFragmentMetadata(InputData inputData) { + public static FileSplit parseFileSplit(InputData inputData) { try { - byte[] serializedLocation = inputData.getFragmentMetadata(); - if (serializedLocation == null) { - throw new IllegalArgumentException( - "Missing fragment location information"); - } - - ByteArrayInputStream bytesStream = new ByteArrayInputStream( - serializedLocation); - ObjectInputStream objectStream = new ObjectInputStream(bytesStream); - - long start = objectStream.readLong(); - long end = objectStream.readLong(); - - String[] hosts = (String[]) objectStream.readObject(); - - FileSplit fileSplit = new FileSplit(new Path( - inputData.getDataSource()), start, end, hosts); + FragmentMetadata fragmentMetadata = Utilities.parseFragmentMetadata(inputData); - LOG.debug("parsed file split: path " + inputData.getDataSource() - + ", start " + start + ", end " + end + ", hosts " - + ArrayUtils.toString(hosts)); + FileSplit fileSplit = new FileSplit(new Path(inputData.getDataSource()), fragmentMetadata.getStart(), fragmentMetadata.getEnd(), fragmentMetadata.getHosts()); return fileSplit; http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/73a20485/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java index 07348b0..0e2fc2a 100644 --- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java +++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/HiveORCAccessor.java @@ -22,17 +22,28 @@ package org.apache.hawq.pxf.plugins.hive; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.ql.io.orc.ColumnStatistics; import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; +import org.apache.hadoop.hive.ql.io.orc.Reader; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory; import org.apache.hadoop.hive.serde2.io.DateWritable; import org.apache.hawq.pxf.api.BasicFilter; import org.apache.hawq.pxf.api.LogicalFilter; +import org.apache.hawq.pxf.api.OneRow; import org.apache.hawq.pxf.api.utilities.ColumnDescriptor; +import org.apache.hawq.pxf.api.utilities.EnumAggregationType; +import org.apache.hawq.pxf.api.utilities.FragmentMetadata; import org.apache.hawq.pxf.api.utilities.InputData; +import org.apache.hawq.pxf.api.utilities.Utilities; +import org.apache.hawq.pxf.plugins.hdfs.utilities.HdfsUtilities; import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities; import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.hive.ql.io.orc.ColumnStatistics; +import org.apache.hadoop.hive.ql.io.orc.StripeInformation; +import org.apache.hadoop.mapred.*; +import java.io.IOException; import java.sql.Date; import java.util.ArrayList; import java.util.Arrays; @@ -53,6 +64,11 @@ public class HiveORCAccessor extends HiveAccessor { private final String READ_ALL_COLUMNS = "hive.io.file.read.all.columns"; private final String READ_COLUMN_NAMES_CONF_STR = "hive.io.file.readcolumn.names"; private final String SARG_PUSHDOWN = "sarg.pushdown"; + protected Reader orcReader; + + private boolean useStats; + private long count; + private long objectsEmitted; /** * Constructs a HiveORCFileAccessor. @@ -65,13 +81,35 @@ public class HiveORCAccessor extends HiveAccessor { HiveUserData hiveUserData = HiveUtilities.parseHiveUserData(input, PXF_HIVE_SERDES.ORC_SERDE); initPartitionFields(hiveUserData.getPartitionKeys()); filterInFragmenter = hiveUserData.isFilterInFragmenter(); + + if (inputData != null && !inputData.hasFilter() && inputData.getAggType() != null && inputData.getAggType().isOptimizationSupported()) { + useStats = true; + } + } + + private void retrieveStats() throws Exception { + FragmentMetadata fragmentMetadata = Utilities.parseFragmentMetadata(inputData); + /* We are using file-level stats therefore if file has multiple splits, + * it's enough to return count for a first split in file*/ + if (fragmentMetadata.getStart() == 0) + this.count = this.orcReader.getNumberOfRows(); } @Override public boolean openForRead() throws Exception { - addColumns(); - addFilters(); - return super.openForRead(); + if (useStats) { + orcReader = HiveUtilities.getOrcReader(inputData); + if (orcReader == null) { + return false; + } + retrieveStats(); + objectsEmitted = 0; + return true; + } else { + addColumns(); + addFilters(); + return super.openForRead(); + } } /** @@ -213,4 +251,29 @@ public class HiveORCAccessor extends HiveAccessor { return true; } + @Override + public OneRow readNextObject() throws IOException { + if (useStats) + return emitAggObject(); + else + return super.readNextObject(); + } + + private OneRow emitAggObject() { + OneRow row = null; + switch (inputData.getAggType()) { + case COUNT: + if (objectsEmitted < count) { + objectsEmitted++; + row = new OneRow(key, data); + } + break; + default: { + throw new UnsupportedOperationException( + "Aggregation operation is not supoorted."); + } + } + return row; + } + } http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/73a20485/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java index 3328c9f..6d33283 100644 --- a/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java +++ b/pxf/pxf-hive/src/main/java/org/apache/hawq/pxf/plugins/hive/utilities/HiveUtilities.java @@ -30,6 +30,8 @@ import java.util.Properties; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.TableType; @@ -45,6 +47,7 @@ import org.apache.hawq.pxf.api.Metadata.Field; import org.apache.hawq.pxf.api.UnsupportedTypeException; import org.apache.hawq.pxf.api.UserDataException; import org.apache.hawq.pxf.api.utilities.EnumHawqType; +import org.apache.hawq.pxf.api.utilities.FragmentMetadata; import org.apache.hawq.pxf.api.utilities.InputData; import org.apache.hawq.pxf.api.utilities.Utilities; import org.apache.hawq.pxf.api.io.DataType; @@ -57,6 +60,12 @@ import org.apache.hadoop.hive.ql.io.orc.OrcSerde; import org.apache.hawq.pxf.plugins.hive.HiveInputFormatFragmenter.PXF_HIVE_INPUT_FORMATS; import org.apache.hawq.pxf.plugins.hive.HiveUserData; import org.apache.hawq.pxf.plugins.hive.utilities.HiveUtilities.PXF_HIVE_SERDES; +import org.apache.hadoop.hive.ql.io.orc.Reader; +import org.apache.hadoop.hive.ql.io.orc.ReaderImpl; +import org.apache.hadoop.hive.ql.io.orc.OrcFile; +import org.apache.hadoop.hive.ql.io.orc.ColumnStatistics; +import org.apache.hadoop.hive.ql.io.orc.StripeInformation; +import org.apache.hadoop.hive.ql.io.orc.StripeStatistics; /** * Class containing helper functions connecting @@ -627,4 +636,18 @@ public class HiveUtilities { return deserializer; } + + public static Reader getOrcReader(InputData inputData) { + try { + + Path path = new Path(inputData.getDataSource()); + Reader reader = OrcFile.createReader(path.getFileSystem(new Configuration()), path); + + return reader; + + } catch (Exception e) { + throw new RuntimeException( + "Exception while getting orc reader", e); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/73a20485/pxf/pxf-service/src/main/.DS_Store ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/src/main/.DS_Store b/pxf/pxf-service/src/main/.DS_Store new file mode 100644 index 0000000..db3318c Binary files /dev/null and b/pxf/pxf-service/src/main/.DS_Store differ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/73a20485/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/AggBridge.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/AggBridge.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/AggBridge.java new file mode 100644 index 0000000..d274864 --- /dev/null +++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/AggBridge.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hawq.pxf.service; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hawq.pxf.api.BadRecordException; +import org.apache.hawq.pxf.api.OneField; +import org.apache.hawq.pxf.api.OneRow; +import org.apache.hawq.pxf.service.io.Writable; +import org.apache.hawq.pxf.service.utilities.ProtocolData; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.collections.map.LRUMap; + +public class AggBridge extends ReadBridge implements Bridge { + + private static final Log LOG = LogFactory.getLog(AggBridge.class); + private LRUMap resolvedFieldsCache; + + public AggBridge(ProtocolData protData) throws Exception { + super(protData); + } + + @Override + public boolean beginIteration() throws Exception { + /* Initialize LRU cache with 100 items*/ + resolvedFieldsCache = new LRUMap(); + return super.fileAccessor.openForRead(); + } + + @Override + @SuppressWarnings("unchecked") + public Writable getNext() throws Exception { + Writable output = null; + List<OneField> resolvedFields = null; + OneRow onerow = null; + + if (!outputQueue.isEmpty()) { + return outputQueue.pop(); + } + + try { + while (outputQueue.isEmpty()) { + onerow = fileAccessor.readNextObject(); + if (onerow == null) { + break; + } + resolvedFields = (List<OneField>) resolvedFieldsCache.get(onerow.getKey()); + if (resolvedFields == null) { + resolvedFields = fieldsResolver.getFields(onerow); + resolvedFieldsCache.put(onerow.getKey(), resolvedFields); + } + outputQueue = outputBuilder.makeOutput(resolvedFields); + if (!outputQueue.isEmpty()) { + output = outputQueue.pop(); + break; + } + } + } catch (Exception ex) { + throw ex; + } + + return output; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/73a20485/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/BridgeResource.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/BridgeResource.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/BridgeResource.java index 104f353..39dc985 100644 --- a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/BridgeResource.java +++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/rest/BridgeResource.java @@ -39,7 +39,7 @@ import javax.ws.rs.core.StreamingOutput; import org.apache.catalina.connector.ClientAbortException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; - +import org.apache.hawq.pxf.service.AggBridge; import org.apache.hawq.pxf.service.Bridge; import org.apache.hawq.pxf.service.ReadBridge; import org.apache.hawq.pxf.service.ReadSamplingBridge; @@ -98,6 +98,8 @@ public class BridgeResource extends RestResource { float sampleRatio = protData.getStatsSampleRatio(); if (sampleRatio > 0) { bridge = new ReadSamplingBridge(protData); + } else if (protData.getAggType().isOptimizationSupported()) { + bridge = new AggBridge(protData); } else { bridge = new ReadBridge(protData); } http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/73a20485/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java index dc2a110..6f21068 100644 --- a/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java +++ b/pxf/pxf-service/src/main/java/org/apache/hawq/pxf/service/utilities/ProtocolData.java @@ -31,6 +31,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hawq.pxf.api.OutputFormat; import org.apache.hawq.pxf.api.utilities.ColumnDescriptor; +import org.apache.hawq.pxf.api.utilities.EnumAggregationType; import org.apache.hawq.pxf.api.utilities.InputData; import org.apache.hawq.pxf.api.utilities.ProfilesConf; @@ -115,6 +116,10 @@ public class ProtocolData extends InputData { // Store alignment for global use as a system property System.setProperty("greenplum.alignment", getProperty("ALIGNMENT")); + + String aggTypeOperationName = getOptionalProperty("AGG-TYPE"); + + this.setAggType(EnumAggregationType.getAggregationType(aggTypeOperationName)); } /**
