PHOENIX-3416 Memory leak in PhoenixStorageHandler Signed-off-by: Sergey Soldatov <s...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/46d4bb4c Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/46d4bb4c Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/46d4bb4c Branch: refs/heads/4.x-HBase-0.98 Commit: 46d4bb4ca0a9f90316c3f36d397b36405d8766e7 Parents: 3bb1a2b Author: Jeongdae Kim <kjd9...@gmail.com> Authored: Thu Oct 27 20:50:53 2016 +0900 Committer: Sergey Soldatov <s...@apache.org> Committed: Wed Nov 2 12:58:32 2016 -0700 ---------------------------------------------------------------------- .../phoenix/hive/PhoenixStorageHandler.java | 14 +--- .../hive/mapreduce/PhoenixInputFormat.java | 37 +++++---- .../hive/ppd/PhoenixPredicateDecomposer.java | 15 +++- .../ppd/PhoenixPredicateDecomposerManager.java | 83 -------------------- 4 files changed, 33 insertions(+), 116 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/46d4bb4c/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixStorageHandler.java ---------------------------------------------------------------------- diff --git a/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixStorageHandler.java b/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixStorageHandler.java index e8b5b19..2bc8ace 100644 --- a/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixStorageHandler.java +++ b/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixStorageHandler.java @@ -40,8 +40,6 @@ import org.apache.phoenix.hive.constants.PhoenixStorageHandlerConstants; import org.apache.phoenix.hive.mapreduce.PhoenixInputFormat; import org.apache.phoenix.hive.mapreduce.PhoenixOutputFormat; import org.apache.phoenix.hive.ppd.PhoenixPredicateDecomposer; -import org.apache.phoenix.hive.ppd.PhoenixPredicateDecomposerManager; -import org.apache.phoenix.hive.util.PhoenixStorageHandlerUtil; import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; import java.util.List; @@ -176,19 +174,9 @@ public class PhoenixStorageHandler extends DefaultStorageHandler implements public DecomposedPredicate decomposePredicate(JobConf jobConf, Deserializer deserializer, ExprNodeDesc predicate) { PhoenixSerDe phoenixSerDe = (PhoenixSerDe) deserializer; - String tableName = phoenixSerDe.getTableProperties().getProperty - (PhoenixStorageHandlerConstants.PHOENIX_TABLE_NAME); - String predicateKey = PhoenixStorageHandlerUtil.getTableKeyOfSession(jobConf, tableName); - - if (LOG.isDebugEnabled()) { - LOG.debug("Decomposing predicate with predicateKey : " + predicateKey); - } - List<String> columnNameList = phoenixSerDe.getSerdeParams().getColumnNames(); - PhoenixPredicateDecomposer predicateDecomposer = PhoenixPredicateDecomposerManager - .createPredicateDecomposer(predicateKey, columnNameList); - return predicateDecomposer.decomposePredicate(predicate); + return PhoenixPredicateDecomposer.create(columnNameList).decomposePredicate(predicate); } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/46d4bb4c/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java ---------------------------------------------------------------------- diff --git a/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java b/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java index 0944bb7..e3d0212 100644 --- a/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java +++ b/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java @@ -32,15 +32,14 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.RegionSizeCalculator; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.ql.plan.TableScanDesc; +import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.mapred.InputFormat; -import org.apache.hadoop.mapred.InputSplit; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.RecordReader; -import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.*; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.db.DBWritable; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; @@ -48,7 +47,6 @@ import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; import org.apache.phoenix.hive.constants.PhoenixStorageHandlerConstants; import org.apache.phoenix.hive.ppd.PhoenixPredicateDecomposer; -import org.apache.phoenix.hive.ppd.PhoenixPredicateDecomposerManager; import org.apache.phoenix.hive.ql.index.IndexSearchCondition; import org.apache.phoenix.hive.query.PhoenixQueryBuilder; import org.apache.phoenix.hive.util.PhoenixConnectionUtil; @@ -62,6 +60,7 @@ import org.apache.phoenix.util.PhoenixRuntime; import java.io.IOException; import java.sql.Connection; import java.sql.Statement; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Properties; @@ -83,8 +82,8 @@ public class PhoenixInputFormat<T extends DBWritable> implements InputFormat<Wri @Override public InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException { - String tableName = jobConf.get(PhoenixConfigurationUtil.INPUT_TABLE_NAME); - List<IndexSearchCondition> conditionList = null; + String tableName = jobConf.get(PhoenixStorageHandlerConstants.PHOENIX_TABLE_NAME); + String query; String executionEngine = jobConf.get(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE.varname, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE.getDefaultValue()); @@ -97,17 +96,17 @@ public class PhoenixInputFormat<T extends DBWritable> implements InputFormat<Wri } if (PhoenixStorageHandlerConstants.MR.equals(executionEngine)) { - String predicateKey = PhoenixStorageHandlerUtil.getTableKeyOfSession(jobConf, - tableName); - - if (LOG.isDebugEnabled()) { - LOG.debug("PredicateKey for MR job : " + predicateKey); - } - - PhoenixPredicateDecomposer predicateDecomposer = - PhoenixPredicateDecomposerManager.getPredicateDecomposer(predicateKey); - if (predicateDecomposer != null && predicateDecomposer.isCalledPPD()) { - conditionList = predicateDecomposer.getSearchConditionList(); + List<IndexSearchCondition> conditionList = null; + String filterExprSerialized = jobConf.get(TableScanDesc.FILTER_EXPR_CONF_STR); + if (filterExprSerialized != null) { + ExprNodeGenericFuncDesc filterExpr = + Utilities.deserializeExpression(filterExprSerialized); + PhoenixPredicateDecomposer predicateDecomposer = + PhoenixPredicateDecomposer.create(Arrays.asList(jobConf.get(serdeConstants.LIST_COLUMNS).split(","))); + predicateDecomposer.decomposePredicate(filterExpr); + if (predicateDecomposer.isCalledPPD()) { + conditionList = predicateDecomposer.getSearchConditionList(); + } } query = PhoenixQueryBuilder.getInstance().buildQuery(jobConf, tableName, http://git-wip-us.apache.org/repos/asf/phoenix/blob/46d4bb4c/phoenix-hive/src/main/java/org/apache/phoenix/hive/ppd/PhoenixPredicateDecomposer.java ---------------------------------------------------------------------- diff --git a/phoenix-hive/src/main/java/org/apache/phoenix/hive/ppd/PhoenixPredicateDecomposer.java b/phoenix-hive/src/main/java/org/apache/phoenix/hive/ppd/PhoenixPredicateDecomposer.java index b94e4df..1e65819 100644 --- a/phoenix-hive/src/main/java/org/apache/phoenix/hive/ppd/PhoenixPredicateDecomposer.java +++ b/phoenix-hive/src/main/java/org/apache/phoenix/hive/ppd/PhoenixPredicateDecomposer.java @@ -42,11 +42,19 @@ public class PhoenixPredicateDecomposer { private List<IndexSearchCondition> searchConditionList; - public PhoenixPredicateDecomposer(List<String> columnNameList) { + public static PhoenixPredicateDecomposer create(List<String> columnNameList) { + return new PhoenixPredicateDecomposer(columnNameList); + } + + private PhoenixPredicateDecomposer(List<String> columnNameList) { this.columnNameList = columnNameList; } public DecomposedPredicate decomposePredicate(ExprNodeDesc predicate) { + if (LOG.isDebugEnabled()) { + LOG.debug("predicate - " + predicate.toString()); + } + IndexPredicateAnalyzer analyzer = PredicateAnalyzerFactory.createPredicateAnalyzer (columnNameList, getFieldValidator()); DecomposedPredicate decomposed = new DecomposedPredicate(); @@ -65,6 +73,11 @@ public class PhoenixPredicateDecomposer { } } + if (LOG.isDebugEnabled()) { + LOG.debug("decomposed predicate - residualPredicate: " + decomposed.residualPredicate + + ", pushedPredicate: " + decomposed.pushedPredicate); + } + return decomposed; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/46d4bb4c/phoenix-hive/src/main/java/org/apache/phoenix/hive/ppd/PhoenixPredicateDecomposerManager.java ---------------------------------------------------------------------- diff --git a/phoenix-hive/src/main/java/org/apache/phoenix/hive/ppd/PhoenixPredicateDecomposerManager.java b/phoenix-hive/src/main/java/org/apache/phoenix/hive/ppd/PhoenixPredicateDecomposerManager.java deleted file mode 100644 index 2faef73..0000000 --- a/phoenix-hive/src/main/java/org/apache/phoenix/hive/ppd/PhoenixPredicateDecomposerManager.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.phoenix.hive.ppd; - -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import java.util.List; -import java.util.Map; - -/** - * Support class that produces PredicateDecomposer for PhoenixStorageHandler - */ - -public class PhoenixPredicateDecomposerManager { - - private static final Log LOG = LogFactory.getLog(PhoenixPredicateDecomposerManager.class); - - // In case of absence of WHERE clause, PhoenixPredicateDecomposer is not created because - // it's not called method of StorageHandler.decomposePredicate. - - private static final Map<String, List<PhoenixPredicateDecomposer>> PREDICATE_DECOMPOSER_MAP = - Maps.newConcurrentMap(); - - public static PhoenixPredicateDecomposer createPredicateDecomposer(String predicateKey, - List<String> - columnNameList) { - List<PhoenixPredicateDecomposer> predicateDecomposerList = PREDICATE_DECOMPOSER_MAP.get - (predicateKey); - if (predicateDecomposerList == null) { - predicateDecomposerList = Lists.newArrayList(); - PREDICATE_DECOMPOSER_MAP.put(predicateKey, predicateDecomposerList); - } - - PhoenixPredicateDecomposer predicateDecomposer = new PhoenixPredicateDecomposer - (columnNameList); - predicateDecomposerList.add(predicateDecomposer); - - if (LOG.isDebugEnabled()) { - LOG.debug("Predicate-decomposer : " + PREDICATE_DECOMPOSER_MAP + " [" + - predicateKey + "] : " + predicateDecomposer); - } - - return predicateDecomposer; - } - - public static PhoenixPredicateDecomposer getPredicateDecomposer(String predicateKey) { - List<PhoenixPredicateDecomposer> predicateDecomposerList = PREDICATE_DECOMPOSER_MAP.get - (predicateKey); - - PhoenixPredicateDecomposer predicateDecomposer = null; - if (predicateDecomposerList != null && predicateDecomposerList.size() > 0) { - predicateDecomposer = predicateDecomposerList.remove(0); - } - - if (LOG.isDebugEnabled()) { - LOG.debug("Predicate-decomposer : " + PREDICATE_DECOMPOSER_MAP + " [" + predicateKey - + "] : " + predicateDecomposer); - } - - return predicateDecomposer; - } - - private PhoenixPredicateDecomposerManager() { - } -}