http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a27cecc/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java index 1611466..f5c9295 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java @@ -75,6 +75,7 @@ import com.google.common.collect.ImmutableSortedMap; import com.google.common.collect.ListMultimap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.protobuf.HBaseZeroCopyByteString; import com.sun.istack.NotNull; import co.cask.tephra.TxConstants; @@ -1016,27 +1017,17 @@ public class PTableImpl implements PTable { boolean isImmutableRows = table.getIsImmutableRows(); SortedMap<byte[], GuidePostsInfo> tableGuidePosts = new TreeMap<byte[], GuidePostsInfo>(Bytes.BYTES_COMPARATOR); - for (PTableProtos.PTableStats pTableStatsProto : table.getGuidePostsList()) { - List<byte[]> value = Lists.newArrayListWithExpectedSize(pTableStatsProto.getValuesCount()); - for (int j = 0; j < pTableStatsProto.getValuesCount(); j++) { - value.add(pTableStatsProto.getValues(j).toByteArray()); - } - // No op - pTableStatsProto.getGuidePostsByteCount(); - value = Lists.newArrayListWithExpectedSize(pTableStatsProto.getValuesCount()); - PGuidePosts pGuidePosts = pTableStatsProto.getPGuidePosts(); - for(int j = 0; j < pGuidePosts.getGuidePostsCount(); j++) { - value.add(pGuidePosts.getGuidePosts(j).toByteArray()); - } - long guidePostsByteCount = pGuidePosts.getByteCount(); - long rowCount = pGuidePosts.getRowCount(); - // TODO : Not exposing MIN/MAX key outside to client - GuidePostsInfo info = - new GuidePostsInfo(guidePostsByteCount, value, rowCount); - tableGuidePosts.put(pTableStatsProto.getKey().toByteArray(), info); + for (PTableProtos.PTableStats pTableStatsProto : table.getGuidePostsList()) { + PGuidePosts pGuidePosts = pTableStatsProto.getPGuidePosts(); + long guidePostsByteCount = pGuidePosts.getByteCount(); + long rowCount = pGuidePosts.getRowCount(); + int maxLength = pGuidePosts.getMaxLength(); + int guidePostsCount = pGuidePosts.getEncodedGuidePostsCount(); + GuidePostsInfo info = new GuidePostsInfo(guidePostsByteCount, + new ImmutableBytesWritable(HBaseZeroCopyByteString.zeroCopyGetBytes(pGuidePosts.getEncodedGuidePosts())), rowCount, maxLength, guidePostsCount); + tableGuidePosts.put(pTableStatsProto.getKey().toByteArray(), info); } PTableStats stats = new PTableStatsImpl(tableGuidePosts, table.getStatsTimeStamp()); - PName dataTableName = null; if (table.hasDataTableNameBytes()) { dataTableName = PNameFactory.newName(table.getDataTableNameBytes().toByteArray()); @@ -1141,16 +1132,14 @@ public class PTableImpl implements PTable { for (Map.Entry<byte[], GuidePostsInfo> entry : table.getTableStats().getGuidePosts().entrySet()) { PTableProtos.PTableStats.Builder statsBuilder = PTableProtos.PTableStats.newBuilder(); statsBuilder.setKey(ByteStringer.wrap(entry.getKey())); - for (byte[] stat : entry.getValue().getGuidePosts()) { - statsBuilder.addValues(ByteStringer.wrap(stat)); - } statsBuilder.setGuidePostsByteCount(entry.getValue().getByteCount()); + statsBuilder.setGuidePostsCount(entry.getValue().getGuidePostsCount()); PGuidePostsProtos.PGuidePosts.Builder guidePstsBuilder = PGuidePostsProtos.PGuidePosts.newBuilder(); - for (byte[] stat : entry.getValue().getGuidePosts()) { - guidePstsBuilder.addGuidePosts(ByteStringer.wrap(stat)); - } + guidePstsBuilder.setEncodedGuidePosts(ByteStringer.wrap(entry.getValue().getGuidePosts().get())); guidePstsBuilder.setByteCount(entry.getValue().getByteCount()); guidePstsBuilder.setRowCount(entry.getValue().getRowCount()); + guidePstsBuilder.setMaxLength(entry.getValue().getMaxLength()); + guidePstsBuilder.setEncodedGuidePostsCount(entry.getValue().getGuidePostsCount()); statsBuilder.setPGuidePosts(guidePstsBuilder); builder.addGuidePosts(statsBuilder.build()); }
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a27cecc/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/GuidePostsInfo.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/GuidePostsInfo.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/GuidePostsInfo.java index 0f1dbeb..da7d3a5 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/GuidePostsInfo.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/GuidePostsInfo.java @@ -17,14 +17,8 @@ */ package org.apache.phoenix.schema.stats; -import java.util.ArrayList; -import java.util.List; - -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.util.Bytes; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.phoenix.util.ByteUtil; /** * A class that holds the guidePosts of a region and also allows combining the * guidePosts of different regions when the GuidePostsInfo is formed for a table. @@ -34,7 +28,7 @@ public class GuidePostsInfo { /** * the total number of guidePosts for the table combining all the guidePosts per region per cf. */ - private List<byte[]> guidePosts; + private ImmutableBytesWritable guidePosts; /** * The bytecount that is flattened across the total number of guide posts. */ @@ -45,7 +39,19 @@ public class GuidePostsInfo { */ private long rowCount = 0; - private long keyByteSize; // Total number of bytes in keys stored in guidePosts + /** + * Maximum length of a guidePost collected + */ + private int maxLength; + + public final static GuidePostsInfo EMPTY_GUIDEPOST = new GuidePostsInfo(0, + new ImmutableBytesWritable(ByteUtil.EMPTY_BYTE_ARRAY), 0, 0, 0); + + public int getMaxLength() { + return maxLength; + } + + private int guidePostsCount; /** * Constructor that creates GuidePostsInfo per region @@ -53,22 +59,20 @@ public class GuidePostsInfo { * @param guidePosts * @param rowCount */ - public GuidePostsInfo(long byteCount, List<byte[]> guidePosts, long rowCount) { - this.guidePosts = ImmutableList.copyOf(guidePosts); - int size = 0; - for (byte[] key : guidePosts) { - size += key.length; - } - this.keyByteSize = size; + public GuidePostsInfo(long byteCount, ImmutableBytesWritable guidePosts, long rowCount, int maxLength, int guidePostsCount) { + this.guidePosts = new ImmutableBytesWritable(guidePosts); + this.maxLength = maxLength; this.byteCount = byteCount; this.rowCount = rowCount; + this.guidePostsCount = guidePostsCount; } + public long getByteCount() { return byteCount; } - public List<byte[]> getGuidePosts() { + public ImmutableBytesWritable getGuidePosts() { return guidePosts; } @@ -76,70 +80,8 @@ public class GuidePostsInfo { return this.rowCount; } - public void incrementRowCount() { - this.rowCount++; - } - - /** - * Combines the GuidePosts per region into one. - * @param oldInfo - */ - public void combine(GuidePostsInfo oldInfo) { - if (!oldInfo.getGuidePosts().isEmpty()) { - byte[] newFirstKey = oldInfo.getGuidePosts().get(0); - byte[] existingLastKey; - if (!this.getGuidePosts().isEmpty()) { - existingLastKey = this.getGuidePosts().get(this.getGuidePosts().size() - 1); - } else { - existingLastKey = HConstants.EMPTY_BYTE_ARRAY; - } - int size = oldInfo.getGuidePosts().size(); - // If the existing guidePosts is lesser than the new RegionInfo that we are combining - // then add the new Region info to the end of the current GuidePosts. - // If the new region info is smaller than the existing guideposts then add the existing - // guide posts after the new guideposts. - List<byte[]> newTotalGuidePosts = new ArrayList<byte[]>(this.getGuidePosts().size() + size); - if (Bytes.compareTo(existingLastKey, newFirstKey) <= 0) { - newTotalGuidePosts.addAll(this.getGuidePosts()); - newTotalGuidePosts.addAll(oldInfo.getGuidePosts()); - } else { - newTotalGuidePosts.addAll(oldInfo.getGuidePosts()); - newTotalGuidePosts.addAll(this.getGuidePosts()); - } - this.guidePosts = ImmutableList.copyOf(newTotalGuidePosts); - } - this.byteCount += oldInfo.getByteCount(); - this.keyByteSize += oldInfo.keyByteSize; - this.rowCount += oldInfo.getRowCount(); - } - - /** - * The guide posts, rowCount and byteCount are accumulated every time a guidePosts depth is - * reached while collecting stats. - * @param row - * @param byteCount - * @return - */ - public boolean addGuidePost(byte[] row, long byteCount, long rowCount) { - if (guidePosts.isEmpty() || Bytes.compareTo(row, guidePosts.get(guidePosts.size() - 1)) > 0) { - List<byte[]> newGuidePosts = Lists.newArrayListWithExpectedSize(this.getGuidePosts().size() + 1); - newGuidePosts.addAll(guidePosts); - newGuidePosts.add(row); - this.guidePosts = ImmutableList.copyOf(newGuidePosts); - this.byteCount += byteCount; - this.keyByteSize += row.length; - this.rowCount+=rowCount; - return true; - } - return false; - } - - public boolean addGuidePost(byte[] row) { - return addGuidePost(row, 0, 0); - } - - public boolean addGuidePost(byte[] row, long byteCount) { - return addGuidePost(row, byteCount, 0); + public int getGuidePostsCount() { + return guidePostsCount; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a27cecc/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/GuidePostsInfoBuilder.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/GuidePostsInfoBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/GuidePostsInfoBuilder.java new file mode 100644 index 0000000..f3ada82 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/GuidePostsInfoBuilder.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.schema.stats; + +import java.io.DataOutputStream; +import java.io.IOException; + +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.util.ByteUtil; +import org.apache.phoenix.util.PrefixByteCodec; +import org.apache.phoenix.util.PrefixByteEncoder; +import org.apache.phoenix.util.TrustedByteArrayOutputStream; + +/* + * Builder to help in adding guidePosts and building guidePostInfo. This is used when we are collecting stats or reading stats for a table. + */ + +public class GuidePostsInfoBuilder { + private PrefixByteEncoder encoder; + private byte[] lastRow; + private ImmutableBytesWritable guidePosts=new ImmutableBytesWritable(ByteUtil.EMPTY_BYTE_ARRAY); + private long byteCount = 0; + private int guidePostsCount; + + /** + * The rowCount that is flattened across the total number of guide posts. + */ + private long rowCount = 0; + + /** + * Maximum length of a guidePost collected + */ + private int maxLength; + private DataOutputStream output; + private TrustedByteArrayOutputStream stream; + + public final static GuidePostsInfo EMPTY_GUIDEPOST = new GuidePostsInfo(0, + new ImmutableBytesWritable(ByteUtil.EMPTY_BYTE_ARRAY), 0, 0, 0); + + public int getMaxLength() { + return maxLength; + } + public GuidePostsInfoBuilder(){ + this.stream = new TrustedByteArrayOutputStream(1); + this.output = new DataOutputStream(stream); + this.encoder=new PrefixByteEncoder(); + lastRow = ByteUtil.EMPTY_BYTE_ARRAY; + } + + /** + * The guide posts, rowCount and byteCount are accumulated every time a guidePosts depth is + * reached while collecting stats. + * @param row + * @param byteCount + * @return + * @throws IOException + */ + public boolean addGuidePosts( byte[] row, long byteCount, long rowCount) { + if (row.length != 0 && Bytes.compareTo(lastRow, row) < 0) { + try { + encoder.encode(output, row, 0, row.length); + this.byteCount += byteCount; + this.guidePostsCount++; + this.maxLength = encoder.getMaxLength(); + this.rowCount += rowCount; + lastRow = row; + return true; + } catch (IOException e) { + return false; + } + } + return false; + } + + public boolean addGuidePosts(byte[] row){ + return addGuidePosts(row, 0, 0); + } + + public boolean addGuidePosts(byte[] row, long byteCount){ + return addGuidePosts(row, byteCount, 0); + } + + private void close() { + PrefixByteCodec.close(stream); + } + + public GuidePostsInfo build() { + this.guidePosts.set(stream.getBuffer(), 0, stream.size()); + GuidePostsInfo guidePostsInfo = new GuidePostsInfo(this.byteCount, this.guidePosts, this.rowCount, this.maxLength, this.guidePostsCount); + this.close(); + return guidePostsInfo; + } + public void incrementRowCount() { + this.rowCount++; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a27cecc/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/PTableStatsImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/PTableStatsImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/PTableStatsImpl.java index dc70e86..dacc213 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/PTableStatsImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/PTableStatsImpl.java @@ -17,13 +17,19 @@ */ package org.apache.phoenix.schema.stats; -import java.util.List; +import java.io.ByteArrayInputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.EOFException; import java.util.Map; import java.util.SortedMap; import java.util.TreeMap; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.coprocessor.MetaDataProtocol; +import org.apache.phoenix.util.PrefixByteCodec; +import org.apache.phoenix.util.PrefixByteDecoder; import org.apache.phoenix.util.SizedUtil; import com.sun.istack.NotNull; @@ -47,12 +53,10 @@ public class PTableStatsImpl implements PTableStats { for (Map.Entry<byte[], GuidePostsInfo> entry : guidePosts.entrySet()) { byte[] cf = entry.getKey(); estimatedSize += SizedUtil.ARRAY_SIZE + cf.length; - List<byte[]> keys = entry.getValue().getGuidePosts(); - estimatedSize += SizedUtil.sizeOfArrayList(keys.size()); - for (byte[] key : keys) { - estimatedSize += SizedUtil.ARRAY_SIZE + key.length; - } + estimatedSize += entry.getValue().getGuidePosts().getLength(); estimatedSize += SizedUtil.LONG_SIZE; + estimatedSize += SizedUtil.INT_SIZE; + estimatedSize += SizedUtil.INT_SIZE; } this.estimatedSize = estimatedSize; } @@ -65,19 +69,34 @@ public class PTableStatsImpl implements PTableStats { @Override public String toString() { StringBuilder buf = new StringBuilder(); + buf.append("PTableStats ["); for (Map.Entry<byte[], GuidePostsInfo> entry : guidePosts.entrySet()) { buf.append(Bytes.toStringBinary(entry.getKey())); buf.append(":("); - List<byte[]> keys = entry.getValue().getGuidePosts(); - if (!keys.isEmpty()) { - for (byte[] key : keys) { - buf.append(Bytes.toStringBinary(key)); - buf.append(","); + ImmutableBytesWritable keys = entry.getValue().getGuidePosts(); + ByteArrayInputStream stream = new ByteArrayInputStream(keys.get(), keys.getOffset(), keys.getLength()); + try { + if (keys.getLength() != 0) { + DataInput input = new DataInputStream(stream); + PrefixByteDecoder decoder = new PrefixByteDecoder(entry.getValue().getMaxLength()); + try { + while (true) { + ImmutableBytesWritable ptr = PrefixByteCodec.decode(decoder, input); + buf.append(Bytes.toStringBinary(ptr.get())); + buf.append(","); + } + } catch (EOFException e) { // Ignore as this signifies we're done + + } finally { + PrefixByteCodec.close(stream); + } + buf.setLength(buf.length() - 1); } - buf.setLength(buf.length()-1); + buf.append(")"); + } finally { + PrefixByteCodec.close(stream); } - buf.append(")"); } buf.append("]"); return buf.toString(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a27cecc/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java index fc8d8bd..3462f22 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsCollector.java @@ -19,7 +19,6 @@ package org.apache.phoenix.schema.stats; import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.Map; @@ -61,9 +60,9 @@ public class StatisticsCollector { private long guidepostDepth; private long maxTimeStamp = MetaDataProtocol.MIN_TABLE_TIMESTAMP; - private Map<ImmutableBytesPtr, Pair<Long, GuidePostsInfo>> guidePostsMap = Maps.newHashMap(); + private Map<ImmutableBytesPtr, Pair<Long, GuidePostsInfoBuilder>> guidePostsInfoWriterMap = Maps.newHashMap(); protected StatisticsWriter statsTable; - private Pair<Long, GuidePostsInfo> cachedGps = null; + private Pair<Long, GuidePostsInfoBuilder> cachedGps = null; public StatisticsCollector(RegionCoprocessorEnvironment env, String tableName, long clientTimeStamp) throws IOException { @@ -99,8 +98,8 @@ public class StatisticsCollector { // in a compaction we know the one family ahead of time if (family != null) { ImmutableBytesPtr cfKey = new ImmutableBytesPtr(family); - cachedGps = new Pair<Long, GuidePostsInfo>(0l, new GuidePostsInfo(0, Collections.<byte[]> emptyList(), 0l)); - guidePostsMap.put(cfKey, cachedGps); + cachedGps = new Pair<Long, GuidePostsInfoBuilder>(0l, new GuidePostsInfoBuilder()); + guidePostsInfoWriterMap.put(cfKey, cachedGps); } } @@ -131,7 +130,7 @@ public class StatisticsCollector { throws IOException { try { // update the statistics table - for (ImmutableBytesPtr fam : guidePostsMap.keySet()) { + for (ImmutableBytesPtr fam : guidePostsInfoWriterMap.keySet()) { if (delete) { if (logger.isDebugEnabled()) { logger.debug("Deleting the stats for the region " + region.getRegionInfo()); @@ -161,22 +160,22 @@ public class StatisticsCollector { */ public void collectStatistics(final List<Cell> results) { Map<ImmutableBytesPtr, Boolean> famMap = Maps.newHashMap(); - List<GuidePostsInfo> rowTracker = null; + List<GuidePostsInfoBuilder> rowTracker = null; if (cachedGps == null) { - rowTracker = new ArrayList<GuidePostsInfo>(); + rowTracker = new ArrayList<GuidePostsInfoBuilder>(); } for (Cell cell : results) { KeyValue kv = KeyValueUtil.ensureKeyValue(cell); maxTimeStamp = Math.max(maxTimeStamp, kv.getTimestamp()); - Pair<Long, GuidePostsInfo> gps; + Pair<Long, GuidePostsInfoBuilder> gps; if (cachedGps == null) { ImmutableBytesPtr cfKey = new ImmutableBytesPtr(kv.getFamilyArray(), kv.getFamilyOffset(), kv.getFamilyLength()); - gps = guidePostsMap.get(cfKey); + gps = guidePostsInfoWriterMap.get(cfKey); if (gps == null) { - gps = new Pair<Long, GuidePostsInfo>(0l, - new GuidePostsInfo(0, Collections.<byte[]> emptyList(), 0l)); - guidePostsMap.put(cfKey, gps); + gps = new Pair<Long, GuidePostsInfoBuilder>(0l, + new GuidePostsInfoBuilder()); + guidePostsInfoWriterMap.put(cfKey, gps); } if (famMap.get(cfKey) == null) { famMap.put(cfKey, true); @@ -191,13 +190,13 @@ public class StatisticsCollector { if (byteCount >= guidepostDepth) { byte[] row = ByteUtil.copyKeyBytesIfNecessary( new ImmutableBytesWritable(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength())); - if (gps.getSecond().addGuidePost(row, byteCount)) { + if (gps.getSecond().addGuidePosts(row, byteCount)) { gps.setFirst(0l); } } } if (cachedGps == null) { - for (GuidePostsInfo s : rowTracker) { + for (GuidePostsInfoBuilder s : rowTracker) { s.incrementRowCount(); } } else { @@ -221,24 +220,13 @@ public class StatisticsCollector { } public void clear() { - this.guidePostsMap.clear(); + this.guidePostsInfoWriterMap.clear(); maxTimeStamp = MetaDataProtocol.MIN_TABLE_TIMESTAMP; } - public void addGuidePost(ImmutableBytesPtr cfKey, GuidePostsInfo info, long byteSize, long timestamp, - byte[] minKey) { - Pair<Long, GuidePostsInfo> newInfo = new Pair<Long, GuidePostsInfo>(byteSize, info); - Pair<Long, GuidePostsInfo> oldInfo = guidePostsMap.put(cfKey, newInfo); - if (oldInfo != null) { - info.combine(oldInfo.getSecond()); - newInfo.setFirst(oldInfo.getFirst() + newInfo.getFirst()); - } - maxTimeStamp = Math.max(maxTimeStamp, timestamp); - } - public GuidePostsInfo getGuidePosts(ImmutableBytesPtr fam) { - Pair<Long, GuidePostsInfo> pair = guidePostsMap.get(fam); - if (pair != null) { return pair.getSecond(); } + Pair<Long, GuidePostsInfoBuilder> pair = guidePostsInfoWriterMap.get(fam); + if (pair != null) { return pair.getSecond().build(); } return null; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a27cecc/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java index d6f0bf1..13e9491 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java @@ -134,7 +134,8 @@ public class StatisticsScanner implements InternalScanner { } finally { try { collectionTracker.removeCompactingRegion(regionInfo); - stats.close(); + stats.close();// close the writer + tracker.close();// close the tracker } catch (IOException e) { if (toThrow == null) toThrow = e; LOG.error("Error while closing the stats table", e); http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a27cecc/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java index 16f249c..5b47104 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java @@ -21,8 +21,8 @@ import static org.apache.phoenix.util.SchemaUtil.getVarCharLength; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.List; +import java.util.SortedMap; import java.util.TreeMap; import org.apache.hadoop.hbase.Cell; @@ -55,10 +55,14 @@ public class StatisticsUtil { /** Number of parts in our complex key */ protected static final int NUM_KEY_PARTS = 3; - + public static byte[] getRowKey(byte[] table, ImmutableBytesPtr fam, byte[] guidePostStartKey) { + return getRowKey(table, fam, new ImmutableBytesWritable(guidePostStartKey,0,guidePostStartKey.length)); + } + + public static byte[] getRowKey(byte[] table, ImmutableBytesPtr fam, ImmutableBytesWritable guidePostStartKey) { // always starts with the source table - byte[] rowKey = new byte[table.length + fam.getLength() + guidePostStartKey.length + 2]; + byte[] rowKey = new byte[table.length + fam.getLength() + guidePostStartKey.getLength() + 2]; int offset = 0; System.arraycopy(table, 0, rowKey, offset, table.length); offset += table.length; @@ -66,7 +70,7 @@ public class StatisticsUtil { System.arraycopy(fam.get(), fam.getOffset(), rowKey, offset, fam.getLength()); offset += fam.getLength(); rowKey[offset++] = QueryConstants.SEPARATOR_BYTE; // assumes stats table columns not DESC - System.arraycopy(guidePostStartKey, 0, rowKey, offset, guidePostStartKey.length); + System.arraycopy(guidePostStartKey.get(), 0, rowKey, offset, guidePostStartKey.getLength()); return rowKey; } @@ -126,7 +130,7 @@ public class StatisticsUtil { s.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES); ResultScanner scanner = null; long timeStamp = MetaDataProtocol.MIN_TABLE_TIMESTAMP; - TreeMap<byte[], GuidePostsInfo> guidePostsPerCf = new TreeMap<byte[], GuidePostsInfo>(Bytes.BYTES_COMPARATOR); + TreeMap<byte[], GuidePostsInfoBuilder> guidePostsInfoWriterPerCf = new TreeMap<byte[], GuidePostsInfoBuilder>(Bytes.BYTES_COMPARATOR); try { scanner = statsHTable.getScanner(s); Result result = null; @@ -168,23 +172,33 @@ public class StatisticsUtil { } if (cfName != null) { byte[] newGPStartKey = getGuidePostsInfoFromRowKey(tableNameBytes, cfName, result.getRow()); - GuidePostsInfo guidePosts = guidePostsPerCf.get(cfName); - if (guidePosts == null) { - guidePosts = new GuidePostsInfo(0l, Collections.<byte[]> emptyList(), 0l); - guidePostsPerCf.put(cfName, guidePosts); + GuidePostsInfoBuilder guidePostsInfoWriter = guidePostsInfoWriterPerCf.get(cfName); + if (guidePostsInfoWriter == null) { + guidePostsInfoWriter = new GuidePostsInfoBuilder(); + guidePostsInfoWriterPerCf.put(cfName, guidePostsInfoWriter); } - guidePosts.addGuidePost(newGPStartKey, byteCount, rowCount); + guidePostsInfoWriter.addGuidePosts(newGPStartKey, byteCount, rowCount); } } + if (!guidePostsInfoWriterPerCf.isEmpty()) { return new PTableStatsImpl( + getGuidePostsPerCf(guidePostsInfoWriterPerCf), timeStamp); } } finally { if (scanner != null) { scanner.close(); } } - if (!guidePostsPerCf.isEmpty()) { return new PTableStatsImpl(guidePostsPerCf, timeStamp); } return PTableStats.EMPTY_STATS; } + private static SortedMap<byte[], GuidePostsInfo> getGuidePostsPerCf( + TreeMap<byte[], GuidePostsInfoBuilder> guidePostsWriterPerCf) { + TreeMap<byte[], GuidePostsInfo> guidePostsPerCf = new TreeMap<byte[], GuidePostsInfo>(Bytes.BYTES_COMPARATOR); + for (byte[] key : guidePostsWriterPerCf.keySet()) { + guidePostsPerCf.put(key, guidePostsWriterPerCf.get(key).build()); + } + return guidePostsPerCf; + } + public static long getGuidePostDepth(int guidepostPerRegion, long guidepostWidth, HTableDescriptor tableDesc) { if (guidepostPerRegion > 0) { long maxFileSize = HConstants.DEFAULT_MAX_FILE_SIZE; http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a27cecc/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java index b8bd064..d03af7a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java @@ -17,7 +17,11 @@ */ package org.apache.phoenix.schema.stats; +import java.io.ByteArrayInputStream; import java.io.Closeable; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.EOFException; import java.io.IOException; import java.sql.Date; import java.util.List; @@ -45,6 +49,8 @@ import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.types.PDate; import org.apache.phoenix.schema.types.PLong; import org.apache.phoenix.util.ByteUtil; +import org.apache.phoenix.util.PrefixByteCodec; +import org.apache.phoenix.util.PrefixByteDecoder; import org.apache.phoenix.util.ServerUtil; import org.apache.phoenix.util.TimeKeeper; @@ -103,6 +109,7 @@ public class StatisticsWriter implements Closeable { @Override public void close() throws IOException { statsWriterTable.close(); + statsReaderTable.close(); } /** @@ -134,21 +141,32 @@ public class StatisticsWriter implements Closeable { GuidePostsInfo gps = tracker.getGuidePosts(cfKey); if (gps != null) { boolean rowColumnAdded = false; - for (byte[] gp : gps.getGuidePosts()) { - byte[] prefix = StatisticsUtil.getRowKey(tableName, cfKey, gp); - Put put = new Put(prefix); - if (!rowColumnAdded) { - put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.GUIDE_POSTS_WIDTH_BYTES, - timeStamp, PLong.INSTANCE.toBytes(gps.getByteCount())); - put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, - PhoenixDatabaseMetaData.GUIDE_POSTS_ROW_COUNT_BYTES, timeStamp, - PLong.INSTANCE.toBytes(gps.getRowCount())); - rowColumnAdded = true; + ImmutableBytesWritable keys = gps.getGuidePosts(); + ByteArrayInputStream stream = new ByteArrayInputStream(keys.get(), keys.getOffset(), keys.getLength()); + DataInput input = new DataInputStream(stream); + PrefixByteDecoder decoder = new PrefixByteDecoder(gps.getMaxLength()); + try { + while (true) { + ImmutableBytesWritable ptr = decoder.decode(input); + byte[] prefix = StatisticsUtil.getRowKey(tableName, cfKey, ptr); + Put put = new Put(prefix); + if (!rowColumnAdded) { + put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, PhoenixDatabaseMetaData.GUIDE_POSTS_WIDTH_BYTES, + timeStamp, PLong.INSTANCE.toBytes(gps.getByteCount())); + put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, + PhoenixDatabaseMetaData.GUIDE_POSTS_ROW_COUNT_BYTES, timeStamp, + PLong.INSTANCE.toBytes(gps.getRowCount())); + rowColumnAdded = true; + } + // Add our empty column value so queries behave correctly + put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, timeStamp, + ByteUtil.EMPTY_BYTE_ARRAY); + mutations.add(put); } - // Add our empty column value so queries behave correctly - put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, timeStamp, - ByteUtil.EMPTY_BYTE_ARRAY); - mutations.add(put); + } catch (EOFException e) { // Ignore as this signifies we're done + + } finally { + PrefixByteCodec.close(stream); } } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a27cecc/phoenix-core/src/main/java/org/apache/phoenix/util/ByteUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ByteUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ByteUtil.java index 64d064a..44502a5 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/ByteUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ByteUtil.java @@ -49,6 +49,8 @@ public class ByteUtil { public static final byte[] EMPTY_BYTE_ARRAY = new byte[0]; public static final ImmutableBytesPtr EMPTY_BYTE_ARRAY_PTR = new ImmutableBytesPtr( EMPTY_BYTE_ARRAY); + public static final ImmutableBytesWritable EMPTY_IMMUTABLE_BYTE_ARRAY = new ImmutableBytesWritable( + EMPTY_BYTE_ARRAY); public static final Comparator<ImmutableBytesPtr> BYTES_PTR_COMPARATOR = new Comparator<ImmutableBytesPtr>() { http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a27cecc/phoenix-core/src/main/java/org/apache/phoenix/util/PrefixByteCodec.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/PrefixByteCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/util/PrefixByteCodec.java new file mode 100644 index 0000000..8c3aa80 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/PrefixByteCodec.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.util; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.EOFException; +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; + +import com.google.common.collect.Lists; + +public class PrefixByteCodec { + + public static List<byte[]> decodeBytes(ImmutableBytesWritable encodedBytes, int maxLength) throws IOException { + ByteArrayInputStream stream = new ByteArrayInputStream(encodedBytes.get(), encodedBytes.getOffset(), encodedBytes.getLength()); + DataInput input = new DataInputStream(stream); + PrefixByteDecoder decoder = new PrefixByteDecoder(maxLength); + List<byte[]> listOfBytes = Lists.newArrayList(); + try { + while (true) { + ImmutableBytesWritable ptr = decoder.decode(input); + // For this test, copy the bytes, but we wouldn't do this unless + // necessary for non testing + listOfBytes.add(ptr.copyBytes()); + } + } catch (EOFException e) { // Ignore as this signifies we're done + + } + return listOfBytes; + } + + public static int encodeBytes(List<byte[]> listOfBytes, ImmutableBytesWritable ptr) throws IOException { + TrustedByteArrayOutputStream stream = new TrustedByteArrayOutputStream(calculateSize(listOfBytes)); + DataOutput output = new DataOutputStream(stream); + PrefixByteEncoder encoder = new PrefixByteEncoder(); + for (byte[] bytes : listOfBytes) { + encoder.encode(output, bytes, 0, bytes.length); + } + close(stream); + ptr.set(stream.getBuffer(), 0, stream.size()); + return encoder.getMaxLength(); + } + + public static int calculateSize(List<byte[]> listOfBytes) { + int size = 0; + for (byte[] bytes : listOfBytes) { + size += bytes.length; + } + return size; + } + + public static ImmutableBytesWritable decode(PrefixByteDecoder decoder, DataInput input) throws EOFException { + try { + ImmutableBytesWritable val= decoder.decode(input); + return val; + } catch(EOFException eof){ + throw eof; + }catch (IOException e) { + throw new RuntimeException(e); + } + } + + public static void close(ByteArrayInputStream stream) { + if (stream != null) { + try { + stream.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + + public static void close(ByteArrayOutputStream stream) { + if (stream != null) { + try { + stream.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a27cecc/phoenix-core/src/main/java/org/apache/phoenix/util/PrefixByteDecoder.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/PrefixByteDecoder.java b/phoenix-core/src/main/java/org/apache/phoenix/util/PrefixByteDecoder.java new file mode 100644 index 0000000..c34bda8 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/PrefixByteDecoder.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.util; + +import java.io.DataInput; +import java.io.IOException; + +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.io.WritableUtils; + +/** + * + * Prefix decoder for byte arrays. For encoding, see {@link PrefixByteEncoder}. + * + */ +public class PrefixByteDecoder { + private final int maxLength; + private final ImmutableBytesWritable previous; + + /** + * Used when the maximum length of encoded byte array is not known. Will + * cause a new byte array to be allocated for each call to {@link #decode(DataInput)}. + */ + public PrefixByteDecoder() { + maxLength = -1; + previous = new ImmutableBytesWritable(ByteUtil.EMPTY_BYTE_ARRAY); + } + + /** + * Used when the maximum length of encoded byte array is known in advance. + * Will not allocate new byte array with each call to {@link #decode(DataInput)}. + * @param maxLength maximum length needed for any call to {@link #decode(DataInput)}. + */ + public PrefixByteDecoder(int maxLength) { + if (maxLength > 0) { + this.maxLength = maxLength; + this.previous = new ImmutableBytesWritable(new byte[maxLength], 0, 0); + } else { + this.maxLength = -1; + previous = new ImmutableBytesWritable(ByteUtil.EMPTY_BYTE_ARRAY); + } + } + + /** + * Resets state of decoder if it will be used to decode bytes from a + * different DataInput. + */ + public void reset() { + previous.set(previous.get(),0,0); + } + + /** + * Decodes bytes encoded with {@link PrefixByteEncoder}. + * @param in Input from which bytes are read. + * @return Pointer containing bytes that were decoded. Note that the + * same pointer will be returned with each call, so it must be consumed + * prior to calling decode again. + * @throws IOException + */ + public ImmutableBytesWritable decode(DataInput in) throws IOException { + int prefixLen = WritableUtils.readVInt(in); + int suffixLen = WritableUtils.readVInt(in); + int length = prefixLen + suffixLen; + byte[] b; + if (maxLength == -1) { // Allocate new byte array each time + b = new byte[length]; + System.arraycopy(previous.get(), previous.getOffset(), b, 0, prefixLen); + } else { // Reuse same buffer each time + b = previous.get(); + } + in.readFully(b, prefixLen, suffixLen); + previous.set(b, 0, length); + return previous; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a27cecc/phoenix-core/src/main/java/org/apache/phoenix/util/PrefixByteEncoder.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/PrefixByteEncoder.java b/phoenix-core/src/main/java/org/apache/phoenix/util/PrefixByteEncoder.java new file mode 100644 index 0000000..bf92be5 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/PrefixByteEncoder.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.util; + +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.WritableUtils; + +/** + * + * Prefix encoder for byte arrays. For decoding, see {@link PrefixByteDecoder}. + * + */ +public class PrefixByteEncoder { + private int maxLength; + private final ImmutableBytesWritable previous = new ImmutableBytesWritable(ByteUtil.EMPTY_BYTE_ARRAY); + + public PrefixByteEncoder() { + } + + /** + * Resets the state of the encoder to its initial state (i.e. forgetting + * the previous byte array that may have been encoded). + */ + public void reset() { + previous.set(ByteUtil.EMPTY_BYTE_ARRAY); + } + + /** + * @return the maximum length byte array encountered while encoding + */ + public int getMaxLength() { + return maxLength; + } + + /** + * Prefix encodes the byte array pointed to into the output stream + * @param out output stream to encode into + * @param ptr pointer to byte array to encode. + * @throws IOException + */ + public void encode(DataOutput out, ImmutableBytesWritable ptr) throws IOException { + encode(out, ptr.get(), ptr.getOffset(), ptr.getLength()); + } + + /** + * Prefix encodes the byte array into the output stream + * @param out output stream to encode into + * @param b byte array to encode + * @throws IOException + */ + public void encode(DataOutput out, byte[] b) throws IOException { + encode(out, b, 0, b.length); + } + + /** + * Prefix encodes the byte array from offset to length into output stream. + * Instead of writing the entire byte array, only the portion of the byte array + * that differs from the beginning of the previous byte array written is written. + * + * @param out output stream to encode into + * @param b byte array buffer + * @param offset offset into byte array to start encoding + * @param length length of byte array to encode + * @throws IOException + */ + public void encode(DataOutput out, byte[] b, int offset, int length) throws IOException { + int i = 0; + int prevOffset = previous.getOffset(); + byte[] prevBytes = previous.get(); + int prevLength = previous.getLength(); + int minLength = prevLength < b.length ? prevLength : b.length; + for(i = 0; (i < minLength) && (prevBytes[prevOffset + i] == b[offset + i]); i++); + WritableUtils.writeVInt(out, i); + Bytes.writeByteArray(out, b, offset + i, length - i); + previous.set(b, offset, length); + if (length > maxLength) { + maxLength = length; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a27cecc/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java index c931851..1d6f438 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java @@ -52,12 +52,14 @@ import java.util.Set; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Mutation; @@ -100,6 +102,7 @@ import com.google.common.collect.Sets; public class UpgradeUtil { private static final Logger logger = LoggerFactory.getLogger(UpgradeUtil.class); private static final byte[] SEQ_PREFIX_BYTES = ByteUtil.concat(QueryConstants.SEPARATOR_BYTE_ARRAY, Bytes.toBytes("_SEQ_")); + public static final byte[] UPGRADE_TO_4_7_COLUMN_NAME = Bytes.toBytes("UPGRADE_TO_4_7"); public static String UPSERT_BASE_COLUMN_COUNT_IN_HEADER_ROW = "UPSERT " + "INTO SYSTEM.CATALOG " @@ -1215,4 +1218,67 @@ public class UpgradeUtil { MetaDataEndpointImpl.ROW_KEY_ORDER_OPTIMIZABLE_BYTES, PBoolean.INSTANCE.toBytes(true)); tableMetadata.add(put); } -} + + public static boolean truncateStats(HTableInterface metaTable, HTableInterface statsTable) + throws IOException, InterruptedException { + List<Cell> columnCells = metaTable + .get(new Get(SchemaUtil.getTableKey(null, PhoenixDatabaseMetaData.SYSTEM_SCHEMA_NAME, + PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE))) + .getColumnCells(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES); + if (!columnCells.isEmpty() + && columnCells.get(0).getTimestamp() < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0) { + + byte[] statsTableKey = SchemaUtil.getTableKey(null, PhoenixDatabaseMetaData.SYSTEM_SCHEMA_NAME, + PhoenixDatabaseMetaData.SYSTEM_STATS_TABLE); + KeyValue upgradeKV = KeyValueUtil.newKeyValue(statsTableKey, PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, + UPGRADE_TO_4_7_COLUMN_NAME, MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0 - 1, + ByteUtil.EMPTY_BYTE_ARRAY); + Put upgradePut = new Put(statsTableKey); + upgradePut.add(upgradeKV); + + // check for null in UPGRADE_TO_4_7_COLUMN_NAME in checkAndPut so that only single client + // drop the rows of SYSTEM.STATS + if (metaTable.checkAndPut(statsTableKey, PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, + UPGRADE_TO_4_7_COLUMN_NAME, null, upgradePut)) { + List<Mutation> mutations = Lists.newArrayListWithExpectedSize(1000); + Scan scan = new Scan(); + scan.setRaw(true); + scan.setMaxVersions(); + ResultScanner statsScanner = statsTable.getScanner(scan); + Result r; + mutations.clear(); + int count = 0; + while ((r = statsScanner.next()) != null) { + Delete delete = null; + for (KeyValue keyValue : r.raw()) { + if (KeyValue.Type.codeToType(keyValue.getType()) == KeyValue.Type.Put) { + if (delete == null) { + delete = new Delete(keyValue.getRow()); + } + KeyValue deleteKeyValue = new KeyValue(keyValue.getRowArray(), keyValue.getRowOffset(), + keyValue.getRowLength(), keyValue.getFamilyArray(), keyValue.getFamilyOffset(), + keyValue.getFamilyLength(), keyValue.getQualifierArray(), + keyValue.getQualifierOffset(), keyValue.getQualifierLength(), + keyValue.getTimestamp(), KeyValue.Type.Delete, ByteUtil.EMPTY_BYTE_ARRAY, 0, 0); + delete.addDeleteMarker(deleteKeyValue); + } + } + if (delete != null) { + mutations.add(delete); + if (count > 10) { + statsTable.batch(mutations); + mutations.clear(); + count = 0; + } + count++; + } + } + if (!mutations.isEmpty()) { + statsTable.batch(mutations); + } + return true; + } + } + return false; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a27cecc/phoenix-core/src/test/java/org/apache/phoenix/filter/SkipScanBigFilterTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/filter/SkipScanBigFilterTest.java b/phoenix-core/src/test/java/org/apache/phoenix/filter/SkipScanBigFilterTest.java index a0aca4d..bb5f408 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/filter/SkipScanBigFilterTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/filter/SkipScanBigFilterTest.java @@ -22,12 +22,10 @@ import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; -import java.util.Collections; import java.util.Map; import java.util.Properties; import java.util.SortedMap; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.end2end.Shadower; @@ -40,6 +38,7 @@ import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTableImpl; import org.apache.phoenix.schema.PTableKey; import org.apache.phoenix.schema.stats.GuidePostsInfo; +import org.apache.phoenix.schema.stats.GuidePostsInfoBuilder; import org.apache.phoenix.schema.stats.PTableStats; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.ReadOnlyProps; @@ -648,10 +647,11 @@ public class SkipScanBigFilterTest extends BaseConnectionlessQueryTest { stmt.execute(); final PTable table = conn.unwrap(PhoenixConnection.class).getTable(new PTableKey(null, "PERF.BIG_OLAP_DOC")); - GuidePostsInfo info = new GuidePostsInfo(0,Collections.<byte[]> emptyList(), 0l); + GuidePostsInfoBuilder gpWriter = new GuidePostsInfoBuilder(); for (byte[] gp : guidePosts) { - info.addGuidePost(gp, 1000); + gpWriter.addGuidePosts(gp, 1000); } + GuidePostsInfo info = gpWriter.build(); final SortedMap<byte[], GuidePostsInfo> gpMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR); gpMap.put(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, info); PTable tableWithStats = PTableImpl.makePTable(table, new PTableStats() { http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a27cecc/phoenix-core/src/test/java/org/apache/phoenix/util/PrefixByteEncoderDecoderTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/PrefixByteEncoderDecoderTest.java b/phoenix-core/src/test/java/org/apache/phoenix/util/PrefixByteEncoderDecoderTest.java new file mode 100644 index 0000000..f8aa7db --- /dev/null +++ b/phoenix-core/src/test/java/org/apache/phoenix/util/PrefixByteEncoderDecoderTest.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.util; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; + +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.WritableUtils; +import org.apache.phoenix.query.QueryConstants; +import org.junit.Test; + + +public class PrefixByteEncoderDecoderTest { + + static final List<byte[]> guideposts = Arrays.asList( + ByteUtil.concat(Bytes.toBytes("aaaaaaaaaa"), QueryConstants.SEPARATOR_BYTE_ARRAY, Bytes.toBytes(1000L), Bytes.toBytes("bbbbbbbbbb")), + ByteUtil.concat(Bytes.toBytes("aaaaaaaaaa"), QueryConstants.SEPARATOR_BYTE_ARRAY, Bytes.toBytes(1000L), Bytes.toBytes("bbbbbccccc")), + ByteUtil.concat(Bytes.toBytes("aaaaaaaaaa"), QueryConstants.SEPARATOR_BYTE_ARRAY, Bytes.toBytes(2000L), Bytes.toBytes("bbbbbbbbbb")), + ByteUtil.concat(Bytes.toBytes("bbbbbbbbbb"), QueryConstants.SEPARATOR_BYTE_ARRAY, Bytes.toBytes(1000L), Bytes.toBytes("bbbbbbbbbb")), + ByteUtil.concat(Bytes.toBytes("bbbbbbbbbb"), QueryConstants.SEPARATOR_BYTE_ARRAY, Bytes.toBytes(2000L), Bytes.toBytes("bbbbbbbbbb")), + ByteUtil.concat(Bytes.toBytes("bbbbbbbbbb"), QueryConstants.SEPARATOR_BYTE_ARRAY, Bytes.toBytes(2000L), Bytes.toBytes("c")), + ByteUtil.concat(Bytes.toBytes("bbbbbbbbbbb"), QueryConstants.SEPARATOR_BYTE_ARRAY, Bytes.toBytes(1000L), Bytes.toBytes("bbbbbbbbbb")), + ByteUtil.concat(Bytes.toBytes("d"), QueryConstants.SEPARATOR_BYTE_ARRAY, Bytes.toBytes(1000L), Bytes.toBytes("bbbbbbbbbb")), + ByteUtil.concat(Bytes.toBytes("d"), QueryConstants.SEPARATOR_BYTE_ARRAY, Bytes.toBytes(1000L), Bytes.toBytes("bbbbbbbbbbc")), + ByteUtil.concat(Bytes.toBytes("e"), QueryConstants.SEPARATOR_BYTE_ARRAY, Bytes.toBytes(1000L), Bytes.toBytes("bbbbbbbbbb")) + ); + + @Test + public void testEncode() throws IOException { + List<byte[]> listOfBytes = Arrays.asList(Bytes.toBytes("aaaaa"), Bytes.toBytes("aaaabb")); + ImmutableBytesWritable ptr = new ImmutableBytesWritable(); + int maxLength = PrefixByteCodec.encodeBytes(listOfBytes, ptr); + assertEquals(6, maxLength); + TrustedByteArrayOutputStream stream = new TrustedByteArrayOutputStream(PrefixByteCodec.calculateSize(listOfBytes)); + DataOutput output = new DataOutputStream(stream); + WritableUtils.writeVInt(output, 0); + WritableUtils.writeVInt(output, 5); + output.write(Bytes.toBytes("aaaaa")); // No space savings on first key + WritableUtils.writeVInt(output, 4); + WritableUtils.writeVInt(output, 2); + output.write(Bytes.toBytes("bb")); // Only writes part of second key that's different + assertArrayEquals(stream.toByteArray(), ptr.copyBytes()); + } + + @Test + public void testEncodeDecodeWithSingleBuffer() throws IOException { + testEncodeDecode(true); + } + + @Test + public void testEncodeDecodeWithNewBuffer() throws IOException { + testEncodeDecode(false); + } + + private void testEncodeDecode(boolean useSingleBuffer) throws IOException { + ImmutableBytesWritable ptr = new ImmutableBytesWritable(); + int maxLength = PrefixByteCodec.encodeBytes(guideposts, ptr); + int encodedSize = ptr.getLength(); + int unencodedSize = PrefixByteCodec.calculateSize(guideposts); + assertTrue(encodedSize < unencodedSize); + List<byte[]> listOfBytes = PrefixByteCodec.decodeBytes(ptr, useSingleBuffer ? maxLength : -1); + assertListByteArraysEquals(guideposts, listOfBytes); + } + + private static void assertListByteArraysEquals(List<byte[]> listOfBytes1, List<byte[]> listOfBytes2) { + assertEquals(listOfBytes1.size(), listOfBytes2.size()); + for (int i = 0; i < listOfBytes1.size(); i++) { + assertArrayEquals(listOfBytes1.get(i), listOfBytes2.get(i)); + } + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a27cecc/phoenix-protocol/src/main/PGuidePosts.proto ---------------------------------------------------------------------- diff --git a/phoenix-protocol/src/main/PGuidePosts.proto b/phoenix-protocol/src/main/PGuidePosts.proto index 047a658..14de2eb 100644 --- a/phoenix-protocol/src/main/PGuidePosts.proto +++ b/phoenix-protocol/src/main/PGuidePosts.proto @@ -27,4 +27,7 @@ message PGuidePosts { repeated bytes guidePosts = 1; optional int64 byteCount = 2; optional int64 rowCount = 3; + optional int32 maxLength = 4; + optional int32 encodedGuidePostsCount = 5; + optional bytes encodedGuidePosts = 6; } \ No newline at end of file
