PHOENIX-4791 Array elements are nullified with joins
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/dedc04cc Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/dedc04cc Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/dedc04cc Branch: refs/heads/4.x-cdh5.15 Commit: dedc04cc3d323dff8c68d21cd91951ed44a7611c Parents: 1fcf43c Author: Gerald Sangudi <gsang...@23andme.com> Authored: Thu Aug 23 00:59:12 2018 +0100 Committer: Pedro Boado <pbo...@apache.org> Committed: Wed Oct 17 22:49:38 2018 +0100 ---------------------------------------------------------------------- .../ProjectArrayElemAfterHashJoinIT.java | 177 +++++++++++++++++++ .../coprocessor/HashJoinRegionScanner.java | 69 ++++++-- .../NonAggregateRegionScannerFactory.java | 5 +- .../phoenix/iterate/RegionScannerFactory.java | 7 +- 4 files changed, 243 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/dedc04cc/phoenix-core/src/it/java/org/apache/phoenix/end2end/ProjectArrayElemAfterHashJoinIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ProjectArrayElemAfterHashJoinIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ProjectArrayElemAfterHashJoinIT.java new file mode 100644 index 0000000..170eb69 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ProjectArrayElemAfterHashJoinIT.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.end2end; + +import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.Properties; + +import org.apache.phoenix.util.PropertiesUtil; +import org.apache.phoenix.util.QueryUtil; +import org.junit.Test; + +public class ProjectArrayElemAfterHashJoinIT extends ParallelStatsDisabledIT { + + @Test + public void testSalted() throws Exception { + + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + Connection conn = DriverManager.getConnection(getUrl(), props); + + try { + String table = createSalted(conn); + testTable(conn, table); + } finally { + conn.close(); + } + } + + @Test + public void testUnsalted() throws Exception { + + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + Connection conn = DriverManager.getConnection(getUrl(), props); + + try { + String table = createUnsalted(conn); + testTable(conn, table); + } finally { + conn.close(); + } + } + + private void testTable(Connection conn, String table) throws Exception { + + verifyExplain(conn, table, false, false); + verifyExplain(conn, table, false, true); + verifyExplain(conn, table, true, false); + verifyExplain(conn, table, true, true); + + verifyResults(conn, table, false, false); + verifyResults(conn, table, false, true); + verifyResults(conn, table, true, false); + verifyResults(conn, table, true, true); + } + + private String createSalted(Connection conn) throws Exception { + + String table = "SALTED_" + generateUniqueName(); + String create = "CREATE TABLE " + table + " (" + + " id INTEGER NOT NULL," + + " vals TINYINT[]," + + " CONSTRAINT pk PRIMARY KEY (id)" + + ") SALT_BUCKETS = 4"; + + conn.createStatement().execute(create); + return table; + } + + private String createUnsalted(Connection conn) throws Exception { + + String table = "UNSALTED_" + generateUniqueName(); + String create = "CREATE TABLE " + table + " (" + + " id INTEGER NOT NULL," + + " vals TINYINT[]," + + " CONSTRAINT pk PRIMARY KEY (id)" + + ")"; + + conn.createStatement().execute(create); + return table; + } + + private String getQuery(String table, boolean fullArray, boolean hashJoin) { + + String query = "SELECT id, vals[1] v1, vals[2] v2, vals[3] v3, vals[4] v4" + + (fullArray ? ", vals" : "") + + " FROM " + table + + " WHERE id IN " + + (hashJoin ? "(SELECT 1)" : "(1, 2, 3)") + ; + + return query; + } + + private void verifyExplain(Connection conn, String table, boolean fullArray, boolean hashJoin) + throws Exception { + + String query = "EXPLAIN " + getQuery(table, fullArray, hashJoin); + Statement stmt = conn.createStatement(); + ResultSet rs = stmt.executeQuery(query); + + try { + String plan = QueryUtil.getExplainPlan(rs); + assertTrue(plan != null); + assertTrue(fullArray || plan.contains("SERVER ARRAY ELEMENT PROJECTION")); + assertTrue(hashJoin == plan.contains("JOIN")); + } finally { + rs.close(); + } + } + + private void verifyResults(Connection conn, String table, boolean fullArray, boolean hashJoin) + throws Exception { + + String upsert = "UPSERT INTO " + table + "(id, vals)" + + " VALUES(1, ARRAY[10, 20, 30, 40, 50])"; + PreparedStatement upsertStmt = conn.prepareStatement(upsert); + upsertStmt.execute(); + conn.commit(); + + String query = getQuery(table, fullArray, hashJoin); + Statement stmt = conn.createStatement(); + ResultSet rs = stmt.executeQuery(query); + + try { + assertTrue(rs.next()); + assertEquals(1, rs.getInt("id")); + assertEquals(10, rs.getInt("v1")); + assertEquals(20, rs.getInt("v2")); + assertEquals(30, rs.getInt("v3")); + assertEquals(40, rs.getInt("v4")); + + if (fullArray) { + java.sql.Array array = rs.getArray("vals"); + assertTrue(array != null); + Object obj = array.getArray(); + assertTrue(obj != null); + assertTrue(obj.getClass().isArray()); + assertEquals(5, java.lang.reflect.Array.getLength(obj)); + } + + assertFalse(rs.next()); + } finally { + rs.close(); + } + } + + private void dropTable(Connection conn, String table) throws Exception { + + String drop = "DROP TABLE " + table; + Statement stmt = conn.createStatement(); + stmt.execute(drop); + stmt.close(); + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/dedc04cc/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java index 59f844d..d82aaba 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java @@ -18,10 +18,12 @@ package org.apache.phoenix.coprocessor; import java.io.IOException; +import java.util.ArrayList; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Queue; +import java.util.Set; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.DoNotRetryIOException; @@ -37,12 +39,15 @@ import org.apache.phoenix.cache.TenantCache; import org.apache.phoenix.execute.TupleProjector; import org.apache.phoenix.execute.TupleProjector.ProjectedValueTuple; import org.apache.phoenix.expression.Expression; +import org.apache.phoenix.expression.KeyValueColumnExpression; +import org.apache.phoenix.iterate.RegionScannerFactory; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.join.HashJoinInfo; import org.apache.phoenix.parse.JoinTableNode.JoinType; import org.apache.phoenix.schema.IllegalDataException; import org.apache.phoenix.schema.KeyValueSchema; import org.apache.phoenix.schema.ValueBitSet; +import org.apache.phoenix.schema.tuple.MultiKeyValueTuple; import org.apache.phoenix.schema.tuple.PositionBasedResultTuple; import org.apache.phoenix.schema.tuple.ResultTuple; import org.apache.phoenix.schema.tuple.Tuple; @@ -66,9 +71,27 @@ public class HashJoinRegionScanner implements RegionScanner { private ValueBitSet[] tempSrcBitSet; private final boolean useQualifierAsListIndex; private final boolean useNewValueColumnQualifier; - + private final boolean addArrayCell; + + @SuppressWarnings("unchecked") + public HashJoinRegionScanner(RegionScanner scanner, TupleProjector projector, + HashJoinInfo joinInfo, ImmutableBytesPtr tenantId, + RegionCoprocessorEnvironment env, boolean useQualifierAsIndex, + boolean useNewValueColumnQualifier) + throws IOException { + + this(env, scanner, null, null, projector, joinInfo, + tenantId, useQualifierAsIndex, useNewValueColumnQualifier); + } + @SuppressWarnings("unchecked") - public HashJoinRegionScanner(RegionScanner scanner, TupleProjector projector, HashJoinInfo joinInfo, ImmutableBytesPtr tenantId, RegionCoprocessorEnvironment env, boolean useQualifierAsIndex, boolean useNewValueColumnQualifier) throws IOException { + public HashJoinRegionScanner(RegionCoprocessorEnvironment env, RegionScanner scanner, + final Set<KeyValueColumnExpression> arrayKVRefs, + final Expression[] arrayFuncRefs, TupleProjector projector, + HashJoinInfo joinInfo, ImmutableBytesPtr tenantId, + boolean useQualifierAsIndex, boolean useNewValueColumnQualifier) + throws IOException { + this.env = env; this.scanner = scanner; this.projector = projector; @@ -103,7 +126,7 @@ public class HashJoinRegionScanner implements RegionScanner { Bytes.toLong(ByteUtil.copyKeyBytesIfNecessary(joinId))); throw new DoNotRetryIOException(cause.getMessage(), cause); } - + hashCaches[i] = hashCache; tempSrcBitSet[i] = ValueBitSet.newInstance(joinInfo.getSchemas()[i]); } @@ -113,16 +136,21 @@ public class HashJoinRegionScanner implements RegionScanner { } this.useQualifierAsListIndex = useQualifierAsIndex; this.useNewValueColumnQualifier = useNewValueColumnQualifier; + this.addArrayCell = (arrayFuncRefs != null && arrayFuncRefs.length > 0 && + arrayKVRefs != null && arrayKVRefs.size() > 0); } private void processResults(List<Cell> result, boolean hasBatchLimit) throws IOException { if (result.isEmpty()) return; Tuple tuple = useQualifierAsListIndex ? new PositionBasedResultTuple(result) : new ResultTuple(Result.create(result)); + boolean projected = false; + // For backward compatibility. In new versions, HashJoinInfo.forceProjection() // always returns true. if (joinInfo.forceProjection()) { tuple = projector.projectResults(tuple, useNewValueColumnQualifier); + projected = true; } // TODO: fix below Scanner.next() and Scanner.nextRaw() methods as well. @@ -150,14 +178,15 @@ public class HashJoinRegionScanner implements RegionScanner { dup *= (tempTuples[i] == null ? 1 : tempTuples[i].size()); } for (int i = 0; i < dup; i++) { - resultQueue.offer(tuple); + offerResult(tuple, projected, result); } } else { KeyValueSchema schema = joinInfo.getJoinedSchema(); if (!joinInfo.forceProjection()) { // backward compatibility tuple = projector.projectResults(tuple, useNewValueColumnQualifier); + projected = true; } - resultQueue.offer(tuple); + offerResult(tuple, projected, result); for (int i = 0; i < count; i++) { boolean earlyEvaluation = joinInfo.earlyEvaluation()[i]; JoinType type = joinInfo.getJoinTypes()[i]; @@ -173,7 +202,7 @@ public class HashJoinRegionScanner implements RegionScanner { if (type == JoinType.Inner || type == JoinType.Semi) { continue; } else if (type == JoinType.Anti) { - resultQueue.offer(lhs); + offerResult(lhs, projected, result); continue; } } @@ -182,18 +211,18 @@ public class HashJoinRegionScanner implements RegionScanner { Tuple joined = tempSrcBitSet[i] == ValueBitSet.EMPTY_VALUE_BITSET ? lhs : TupleProjector.mergeProjectedValue( (ProjectedValueTuple) lhs, schema, tempDestBitSet, - null, joinInfo.getSchemas()[i], tempSrcBitSet[i], + null, joinInfo.getSchemas()[i], tempSrcBitSet[i], joinInfo.getFieldPositions()[i], useNewValueColumnQualifier); - resultQueue.offer(joined); + offerResult(joined, projected, result); continue; } for (Tuple t : tempTuples[i]) { Tuple joined = tempSrcBitSet[i] == ValueBitSet.EMPTY_VALUE_BITSET ? lhs : TupleProjector.mergeProjectedValue( (ProjectedValueTuple) lhs, schema, tempDestBitSet, - t, joinInfo.getSchemas()[i], tempSrcBitSet[i], + t, joinInfo.getSchemas()[i], tempSrcBitSet[i], joinInfo.getFieldPositions()[i], useNewValueColumnQualifier); - resultQueue.offer(joined); + offerResult(joined, projected, result); } } } @@ -265,7 +294,7 @@ public class HashJoinRegionScanner implements RegionScanner { processResults(result, false); result.clear(); } - + return nextInQueue(result); } catch (Throwable t) { ServerUtil.throwIOException(env.getRegion().getRegionInfo().getRegionNameAsString(), t); @@ -309,5 +338,21 @@ public class HashJoinRegionScanner implements RegionScanner { return this.scanner.getBatch(); } -} + // PHOENIX-4791 Propagate array element cell through hash join + private void offerResult(Tuple tuple, boolean projected, List<Cell> result) { + if (!projected || !addArrayCell) { + resultQueue.offer(tuple); + return; + } + Cell projectedCell = tuple.getValue(0); + int arrayCellPosition = RegionScannerFactory.getArrayCellPosition(result); + Cell arrayCell = result.get(arrayCellPosition); + + List<Cell> cells = new ArrayList<Cell>(2); + cells.add(projectedCell); + cells.add(arrayCell); + MultiKeyValueTuple multi = new MultiKeyValueTuple(cells); + resultQueue.offer(multi); + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/dedc04cc/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java index cc7221e..1504a7c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java @@ -152,8 +152,9 @@ public class NonAggregateRegionScannerFactory extends RegionScannerFactory { final ImmutableBytesPtr tenantId = ScanUtil.getTenantId(scan); if (j != null) { - innerScanner = new HashJoinRegionScanner(innerScanner, p, j, tenantId, env, useQualifierAsIndex, - useNewValueColumnQualifier); + innerScanner = new HashJoinRegionScanner(env, innerScanner, arrayKVRefs, arrayFuncRefs, + p, j, tenantId, useQualifierAsIndex, + useNewValueColumnQualifier); } if (scanOffset != null) { innerScanner = getOffsetScanner(innerScanner, new OffsetResultIterator( http://git-wip-us.apache.org/repos/asf/phoenix/blob/dedc04cc/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java index aed5805..b47d6b8 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java @@ -286,7 +286,7 @@ public abstract class RegionScannerFactory { QueryConstants.ARRAY_VALUE_COLUMN_QUALIFIER, 0, QueryConstants.ARRAY_VALUE_COLUMN_QUALIFIER.length, HConstants.LATEST_TIMESTAMP, KeyValue.Type.codeToType(rowKv.getTypeByte()), value, 0, value.length)); - return result.size() - 1; + return getArrayCellPosition(result); } @Override @@ -300,4 +300,9 @@ public abstract class RegionScannerFactory { } }; } + + // PHOENIX-4791 Share position of array element cell + public static int getArrayCellPosition(List<Cell> result) { + return result.size() - 1; + } }