http://git-wip-us.apache.org/repos/asf/cassandra/blob/533bf3f6/interface/thrift/gen-java/org/apache/cassandra/thrift/CfSplit.java ---------------------------------------------------------------------- diff --git a/interface/thrift/gen-java/org/apache/cassandra/thrift/CfSplit.java b/interface/thrift/gen-java/org/apache/cassandra/thrift/CfSplit.java new file mode 100644 index 0000000..2519f9f --- /dev/null +++ b/interface/thrift/gen-java/org/apache/cassandra/thrift/CfSplit.java @@ -0,0 +1,549 @@ +/** + * Autogenerated by Thrift Compiler (0.7.0) + * + * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING + */ +package org.apache.cassandra.thrift; +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +import org.apache.commons.lang.builder.HashCodeBuilder; +import java.util.List; +import java.util.ArrayList; +import java.util.Map; +import java.util.HashMap; +import java.util.EnumMap; +import java.util.Set; +import java.util.HashSet; +import java.util.EnumSet; +import java.util.Collections; +import java.util.BitSet; +import java.nio.ByteBuffer; +import java.util.Arrays; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Represents input splits used by hadoop ColumnFamilyRecordReaders + */ +public class CfSplit implements org.apache.thrift.TBase<CfSplit, CfSplit._Fields>, java.io.Serializable, Cloneable { + private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("CfSplit"); + + private static final org.apache.thrift.protocol.TField START_TOKEN_FIELD_DESC = new org.apache.thrift.protocol.TField("start_token", org.apache.thrift.protocol.TType.STRING, (short)1); + private static final org.apache.thrift.protocol.TField END_TOKEN_FIELD_DESC = new org.apache.thrift.protocol.TField("end_token", org.apache.thrift.protocol.TType.STRING, (short)2); + private static final org.apache.thrift.protocol.TField ROW_COUNT_FIELD_DESC = new org.apache.thrift.protocol.TField("row_count", org.apache.thrift.protocol.TType.I64, (short)3); + + public String start_token; // required + public String end_token; // required + public long row_count; // required + + /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ + public enum _Fields implements org.apache.thrift.TFieldIdEnum { + START_TOKEN((short)1, "start_token"), + END_TOKEN((short)2, "end_token"), + ROW_COUNT((short)3, "row_count"); + + private static final Map<String, _Fields> byName = new HashMap<String, _Fields>(); + + static { + for (_Fields field : EnumSet.allOf(_Fields.class)) { + byName.put(field.getFieldName(), field); + } + } + + /** + * Find the _Fields constant that matches fieldId, or null if its not found. + */ + public static _Fields findByThriftId(int fieldId) { + switch(fieldId) { + case 1: // START_TOKEN + return START_TOKEN; + case 2: // END_TOKEN + return END_TOKEN; + case 3: // ROW_COUNT + return ROW_COUNT; + default: + return null; + } + } + + /** + * Find the _Fields constant that matches fieldId, throwing an exception + * if it is not found. + */ + public static _Fields findByThriftIdOrThrow(int fieldId) { + _Fields fields = findByThriftId(fieldId); + if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!"); + return fields; + } + + /** + * Find the _Fields constant that matches name, or null if its not found. + */ + public static _Fields findByName(String name) { + return byName.get(name); + } + + private final short _thriftId; + private final String _fieldName; + + _Fields(short thriftId, String fieldName) { + _thriftId = thriftId; + _fieldName = fieldName; + } + + public short getThriftFieldId() { + return _thriftId; + } + + public String getFieldName() { + return _fieldName; + } + } + + // isset id assignments + private static final int __ROW_COUNT_ISSET_ID = 0; + private BitSet __isset_bit_vector = new BitSet(1); + + public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; + static { + Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); + tmpMap.put(_Fields.START_TOKEN, new org.apache.thrift.meta_data.FieldMetaData("start_token", org.apache.thrift.TFieldRequirementType.REQUIRED, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); + tmpMap.put(_Fields.END_TOKEN, new org.apache.thrift.meta_data.FieldMetaData("end_token", org.apache.thrift.TFieldRequirementType.REQUIRED, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); + tmpMap.put(_Fields.ROW_COUNT, new org.apache.thrift.meta_data.FieldMetaData("row_count", org.apache.thrift.TFieldRequirementType.REQUIRED, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64))); + metaDataMap = Collections.unmodifiableMap(tmpMap); + org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(CfSplit.class, metaDataMap); + } + + public CfSplit() { + } + + public CfSplit( + String start_token, + String end_token, + long row_count) + { + this(); + this.start_token = start_token; + this.end_token = end_token; + this.row_count = row_count; + setRow_countIsSet(true); + } + + /** + * Performs a deep copy on <i>other</i>. + */ + public CfSplit(CfSplit other) { + __isset_bit_vector.clear(); + __isset_bit_vector.or(other.__isset_bit_vector); + if (other.isSetStart_token()) { + this.start_token = other.start_token; + } + if (other.isSetEnd_token()) { + this.end_token = other.end_token; + } + this.row_count = other.row_count; + } + + public CfSplit deepCopy() { + return new CfSplit(this); + } + + @Override + public void clear() { + this.start_token = null; + this.end_token = null; + setRow_countIsSet(false); + this.row_count = 0; + } + + public String getStart_token() { + return this.start_token; + } + + public CfSplit setStart_token(String start_token) { + this.start_token = start_token; + return this; + } + + public void unsetStart_token() { + this.start_token = null; + } + + /** Returns true if field start_token is set (has been assigned a value) and false otherwise */ + public boolean isSetStart_token() { + return this.start_token != null; + } + + public void setStart_tokenIsSet(boolean value) { + if (!value) { + this.start_token = null; + } + } + + public String getEnd_token() { + return this.end_token; + } + + public CfSplit setEnd_token(String end_token) { + this.end_token = end_token; + return this; + } + + public void unsetEnd_token() { + this.end_token = null; + } + + /** Returns true if field end_token is set (has been assigned a value) and false otherwise */ + public boolean isSetEnd_token() { + return this.end_token != null; + } + + public void setEnd_tokenIsSet(boolean value) { + if (!value) { + this.end_token = null; + } + } + + public long getRow_count() { + return this.row_count; + } + + public CfSplit setRow_count(long row_count) { + this.row_count = row_count; + setRow_countIsSet(true); + return this; + } + + public void unsetRow_count() { + __isset_bit_vector.clear(__ROW_COUNT_ISSET_ID); + } + + /** Returns true if field row_count is set (has been assigned a value) and false otherwise */ + public boolean isSetRow_count() { + return __isset_bit_vector.get(__ROW_COUNT_ISSET_ID); + } + + public void setRow_countIsSet(boolean value) { + __isset_bit_vector.set(__ROW_COUNT_ISSET_ID, value); + } + + public void setFieldValue(_Fields field, Object value) { + switch (field) { + case START_TOKEN: + if (value == null) { + unsetStart_token(); + } else { + setStart_token((String)value); + } + break; + + case END_TOKEN: + if (value == null) { + unsetEnd_token(); + } else { + setEnd_token((String)value); + } + break; + + case ROW_COUNT: + if (value == null) { + unsetRow_count(); + } else { + setRow_count((Long)value); + } + break; + + } + } + + public Object getFieldValue(_Fields field) { + switch (field) { + case START_TOKEN: + return getStart_token(); + + case END_TOKEN: + return getEnd_token(); + + case ROW_COUNT: + return Long.valueOf(getRow_count()); + + } + throw new IllegalStateException(); + } + + /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */ + public boolean isSet(_Fields field) { + if (field == null) { + throw new IllegalArgumentException(); + } + + switch (field) { + case START_TOKEN: + return isSetStart_token(); + case END_TOKEN: + return isSetEnd_token(); + case ROW_COUNT: + return isSetRow_count(); + } + throw new IllegalStateException(); + } + + @Override + public boolean equals(Object that) { + if (that == null) + return false; + if (that instanceof CfSplit) + return this.equals((CfSplit)that); + return false; + } + + public boolean equals(CfSplit that) { + if (that == null) + return false; + + boolean this_present_start_token = true && this.isSetStart_token(); + boolean that_present_start_token = true && that.isSetStart_token(); + if (this_present_start_token || that_present_start_token) { + if (!(this_present_start_token && that_present_start_token)) + return false; + if (!this.start_token.equals(that.start_token)) + return false; + } + + boolean this_present_end_token = true && this.isSetEnd_token(); + boolean that_present_end_token = true && that.isSetEnd_token(); + if (this_present_end_token || that_present_end_token) { + if (!(this_present_end_token && that_present_end_token)) + return false; + if (!this.end_token.equals(that.end_token)) + return false; + } + + boolean this_present_row_count = true; + boolean that_present_row_count = true; + if (this_present_row_count || that_present_row_count) { + if (!(this_present_row_count && that_present_row_count)) + return false; + if (this.row_count != that.row_count) + return false; + } + + return true; + } + + @Override + public int hashCode() { + HashCodeBuilder builder = new HashCodeBuilder(); + + boolean present_start_token = true && (isSetStart_token()); + builder.append(present_start_token); + if (present_start_token) + builder.append(start_token); + + boolean present_end_token = true && (isSetEnd_token()); + builder.append(present_end_token); + if (present_end_token) + builder.append(end_token); + + boolean present_row_count = true; + builder.append(present_row_count); + if (present_row_count) + builder.append(row_count); + + return builder.toHashCode(); + } + + public int compareTo(CfSplit other) { + if (!getClass().equals(other.getClass())) { + return getClass().getName().compareTo(other.getClass().getName()); + } + + int lastComparison = 0; + CfSplit typedOther = (CfSplit)other; + + lastComparison = Boolean.valueOf(isSetStart_token()).compareTo(typedOther.isSetStart_token()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetStart_token()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.start_token, typedOther.start_token); + if (lastComparison != 0) { + return lastComparison; + } + } + lastComparison = Boolean.valueOf(isSetEnd_token()).compareTo(typedOther.isSetEnd_token()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetEnd_token()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.end_token, typedOther.end_token); + if (lastComparison != 0) { + return lastComparison; + } + } + lastComparison = Boolean.valueOf(isSetRow_count()).compareTo(typedOther.isSetRow_count()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetRow_count()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.row_count, typedOther.row_count); + if (lastComparison != 0) { + return lastComparison; + } + } + return 0; + } + + public _Fields fieldForId(int fieldId) { + return _Fields.findByThriftId(fieldId); + } + + public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException { + org.apache.thrift.protocol.TField field; + iprot.readStructBegin(); + while (true) + { + field = iprot.readFieldBegin(); + if (field.type == org.apache.thrift.protocol.TType.STOP) { + break; + } + switch (field.id) { + case 1: // START_TOKEN + if (field.type == org.apache.thrift.protocol.TType.STRING) { + this.start_token = iprot.readString(); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type); + } + break; + case 2: // END_TOKEN + if (field.type == org.apache.thrift.protocol.TType.STRING) { + this.end_token = iprot.readString(); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type); + } + break; + case 3: // ROW_COUNT + if (field.type == org.apache.thrift.protocol.TType.I64) { + this.row_count = iprot.readI64(); + setRow_countIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type); + } + break; + default: + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type); + } + iprot.readFieldEnd(); + } + iprot.readStructEnd(); + + // check for required fields of primitive type, which can't be checked in the validate method + if (!isSetRow_count()) { + throw new org.apache.thrift.protocol.TProtocolException("Required field 'row_count' was not found in serialized data! Struct: " + toString()); + } + validate(); + } + + public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException { + validate(); + + oprot.writeStructBegin(STRUCT_DESC); + if (this.start_token != null) { + oprot.writeFieldBegin(START_TOKEN_FIELD_DESC); + oprot.writeString(this.start_token); + oprot.writeFieldEnd(); + } + if (this.end_token != null) { + oprot.writeFieldBegin(END_TOKEN_FIELD_DESC); + oprot.writeString(this.end_token); + oprot.writeFieldEnd(); + } + oprot.writeFieldBegin(ROW_COUNT_FIELD_DESC); + oprot.writeI64(this.row_count); + oprot.writeFieldEnd(); + oprot.writeFieldStop(); + oprot.writeStructEnd(); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder("CfSplit("); + boolean first = true; + + sb.append("start_token:"); + if (this.start_token == null) { + sb.append("null"); + } else { + sb.append(this.start_token); + } + first = false; + if (!first) sb.append(", "); + sb.append("end_token:"); + if (this.end_token == null) { + sb.append("null"); + } else { + sb.append(this.end_token); + } + first = false; + if (!first) sb.append(", "); + sb.append("row_count:"); + sb.append(this.row_count); + first = false; + sb.append(")"); + return sb.toString(); + } + + public void validate() throws org.apache.thrift.TException { + // check for required fields + if (start_token == null) { + throw new org.apache.thrift.protocol.TProtocolException("Required field 'start_token' was not present! Struct: " + toString()); + } + if (end_token == null) { + throw new org.apache.thrift.protocol.TProtocolException("Required field 'end_token' was not present! Struct: " + toString()); + } + // alas, we cannot check 'row_count' because it's a primitive and you chose the non-beans generator. + } + + private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException { + try { + write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out))); + } catch (org.apache.thrift.TException te) { + throw new java.io.IOException(te); + } + } + + private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException { + try { + // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor. + __isset_bit_vector = new BitSet(1); + read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in))); + } catch (org.apache.thrift.TException te) { + throw new java.io.IOException(te); + } + } + +} +
http://git-wip-us.apache.org/repos/asf/cassandra/blob/533bf3f6/interface/thrift/gen-java/org/apache/cassandra/thrift/Constants.java ---------------------------------------------------------------------- diff --git a/interface/thrift/gen-java/org/apache/cassandra/thrift/Constants.java b/interface/thrift/gen-java/org/apache/cassandra/thrift/Constants.java index 7e183c7..9d0701f 100644 --- a/interface/thrift/gen-java/org/apache/cassandra/thrift/Constants.java +++ b/interface/thrift/gen-java/org/apache/cassandra/thrift/Constants.java @@ -44,6 +44,6 @@ import org.slf4j.LoggerFactory; public class Constants { - public static final String VERSION = "19.32.0"; + public static final String VERSION = "19.33.0"; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/533bf3f6/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java index cb79b01..c4c6570 100644 --- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java +++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java @@ -35,6 +35,9 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import com.google.common.collect.ImmutableList; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.cassandra.db.IColumn; import org.apache.cassandra.dht.IPartitioner; @@ -44,18 +47,11 @@ import org.apache.cassandra.thrift.Cassandra; import org.apache.cassandra.thrift.InvalidRequestException; import org.apache.cassandra.thrift.KeyRange; import org.apache.cassandra.thrift.TokenRange; -import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapred.*; -import org.apache.hadoop.mapreduce.InputFormat; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapreduce.*; import org.apache.thrift.TException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Hadoop InputFormat allowing map/reduce against Cassandra rows within one ColumnFamily. @@ -208,7 +204,7 @@ public class ColumnFamilyInputFormat extends InputFormat<ByteBuffer, SortedMap<B public List<InputSplit> call() throws Exception { ArrayList<InputSplit> splits = new ArrayList<InputSplit>(); - List<String> tokens = getSubSplits(keyspace, cfName, range, conf); + List<CfSplit> subSplits = getSubSplits(keyspace, cfName, range, conf); assert range.rpc_endpoints.size() == range.endpoints.size() : "rpc_endpoints size must match endpoints size"; // turn the sub-ranges into InputSplits String[] endpoints = range.endpoints.toArray(new String[range.endpoints.size()]); @@ -223,15 +219,21 @@ public class ColumnFamilyInputFormat extends InputFormat<ByteBuffer, SortedMap<B } Token.TokenFactory factory = partitioner.getTokenFactory(); - for (int i = 1; i < tokens.size(); i++) + for (CfSplit subSplit : subSplits) { - Token left = factory.fromString(tokens.get(i - 1)); - Token right = factory.fromString(tokens.get(i)); + Token left = factory.fromString(subSplit.getStart_token()); + Token right = factory.fromString(subSplit.getEnd_token()); Range<Token> range = new Range<Token>(left, right, partitioner); List<Range<Token>> ranges = range.isWrapAround() ? range.unwrap() : ImmutableList.of(range); for (Range<Token> subrange : ranges) { - ColumnFamilySplit split = new ColumnFamilySplit(factory.toString(subrange.left), factory.toString(subrange.right), endpoints); + ColumnFamilySplit split = + new ColumnFamilySplit( + factory.toString(subrange.left), + factory.toString(subrange.right), + subSplit.getRow_count(), + endpoints); + logger.debug("adding " + split); splits.add(split); } @@ -240,7 +242,7 @@ public class ColumnFamilyInputFormat extends InputFormat<ByteBuffer, SortedMap<B } } - private List<String> getSubSplits(String keyspace, String cfName, TokenRange range, Configuration conf) throws IOException + private List<CfSplit> getSubSplits(String keyspace, String cfName, TokenRange range, Configuration conf) throws IOException { int splitsize = ConfigHelper.getInputSplitSize(conf); for (int i = 0; i < range.rpc_endpoints.size(); i++) @@ -254,7 +256,7 @@ public class ColumnFamilyInputFormat extends InputFormat<ByteBuffer, SortedMap<B { Cassandra.Client client = ConfigHelper.createConnection(conf, host, ConfigHelper.getInputRpcPort(conf)); client.set_keyspace(keyspace); - return client.describe_splits(cfName, range.start_token, range.end_token, splitsize); + return client.describe_splits_ex(cfName, range.start_token, range.end_token, splitsize); } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/533bf3f6/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java index 73f9786..c662932 100644 --- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java +++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java @@ -145,12 +145,11 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap predicate = ConfigHelper.getInputSlicePredicate(conf); boolean widerows = ConfigHelper.getInputIsWide(conf); isEmptyPredicate = isEmptyPredicate(predicate); - totalRowCount = ConfigHelper.getInputSplitSize(conf); + totalRowCount = (int) this.split.getLength(); batchSize = ConfigHelper.getRangeBatchSize(conf); cfName = ConfigHelper.getInputColumnFamily(conf); consistencyLevel = ConsistencyLevel.valueOf(ConfigHelper.getReadConsistencyLevel(conf)); - keyspace = ConfigHelper.getInputKeyspace(conf); try @@ -189,7 +188,11 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap public boolean nextKeyValue() throws IOException { if (!iter.hasNext()) + { + logger.debug("Finished scanning " + iter.rowsRead() + " rows (estimate was: " + totalRowCount + ")"); return false; + } + currentRow = iter.next(); return true; } @@ -482,7 +485,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap Pair<ByteBuffer, SortedMap<ByteBuffer, IColumn>> next = wideColumns.next(); lastColumn = next.right.values().iterator().next().name(); - maybeCountRow(next); + maybeIncreaseRowCounter(next); return next; } @@ -491,7 +494,7 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap * Increases the row counter only if we really moved to the next row. * @param next just fetched row slice */ - private void maybeCountRow(Pair<ByteBuffer, SortedMap<ByteBuffer, IColumn>> next) + private void maybeIncreaseRowCounter(Pair<ByteBuffer, SortedMap<ByteBuffer, IColumn>> next) { ByteBuffer currentKey = next.left; if (!currentKey.equals(lastCountedKey)) http://git-wip-us.apache.org/repos/asf/cassandra/blob/533bf3f6/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java index bd2e487..4085c68 100644 --- a/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java +++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java @@ -33,14 +33,22 @@ public class ColumnFamilySplit extends InputSplit implements Writable, org.apach { private String startToken; private String endToken; + private long length; private String[] dataNodes; + @Deprecated public ColumnFamilySplit(String startToken, String endToken, String[] dataNodes) { + this(startToken, endToken, Long.MAX_VALUE, dataNodes); + } + + public ColumnFamilySplit(String startToken, String endToken, long length, String[] dataNodes) + { assert startToken != null; assert endToken != null; this.startToken = startToken; this.endToken = endToken; + this.length = length; this.dataNodes = dataNodes; } @@ -58,8 +66,7 @@ public class ColumnFamilySplit extends InputSplit implements Writable, org.apach public long getLength() { - // only used for sorting splits. we don't have the capability, yet. - return Long.MAX_VALUE; + return length; } public String[] getLocations() @@ -76,7 +83,7 @@ public class ColumnFamilySplit extends InputSplit implements Writable, org.apach { out.writeUTF(startToken); out.writeUTF(endToken); - + out.writeLong(length); out.writeInt(dataNodes.length); for (String endpoint : dataNodes) { @@ -88,6 +95,7 @@ public class ColumnFamilySplit extends InputSplit implements Writable, org.apach { startToken = in.readUTF(); endToken = in.readUTF(); + length = in.readLong(); int numOfEndpoints = in.readInt(); dataNodes = new String[numOfEndpoints]; http://git-wip-us.apache.org/repos/asf/cassandra/blob/533bf3f6/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index b1eaa1e..80c3f46 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -36,6 +36,7 @@ import com.google.common.base.Supplier; import com.google.common.collect.*; import org.apache.cassandra.metrics.ClientRequestMetrics; + import org.apache.log4j.Level; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; @@ -2184,28 +2185,50 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe } /** - * @return list of Tokens (_not_ keys!) breaking up the data this node is responsible for into pieces of roughly keysPerSplit + * @return list of Token ranges (_not_ keys!) together with estimated key count, + * breaking up the data this node is responsible for into pieces of roughly keysPerSplit */ - public List<Token> getSplits(String table, String cfName, Range<Token> range, int keysPerSplit) + public List<Pair<Range<Token>, Long>> getSplits(String table, String cfName, Range<Token> range, int keysPerSplit) { - List<Token> tokens = new ArrayList<Token>(); - // we use the actual Range token for the first and last brackets of the splits to ensure correctness - tokens.add(range.left); - Table t = Table.open(table); ColumnFamilyStore cfs = t.getColumnFamilyStore(cfName); List<DecoratedKey> keys = keySamples(Collections.singleton(cfs), range); - int splits = keys.size() * DatabaseDescriptor.getIndexInterval() / keysPerSplit; - if (keys.size() >= splits) + final long totalRowCountEstimate = (keys.size() + 1) * DatabaseDescriptor.getIndexInterval(); + + // splitCount should be much smaller than number of key samples, to avoid huge sampling error + final int minSamplesPerSplit = 4; + final int maxSplitCount = keys.size() / minSamplesPerSplit + 1; + final int splitCount = Math.max(1, Math.min(maxSplitCount, (int)(totalRowCountEstimate / keysPerSplit))); + + List<Token> tokens = keysToTokens(range, keys); + return getSplits(tokens, splitCount); + } + + private List<Pair<Range<Token>, Long>> getSplits(List<Token> tokens, int splitCount) + { + final double step = (double) (tokens.size() - 1) / splitCount; + int prevIndex = 0; + Token prevToken = tokens.get(0); + List<Pair<Range<Token>, Long>> splits = Lists.newArrayListWithExpectedSize(splitCount); + for (int i = 1; i <= splitCount; i++) { - for (int i = 1; i < splits; i++) - { - int index = i * (keys.size() / splits); - tokens.add(keys.get(index).token); - } + int index = (int) Math.round(i * step); + Token token = tokens.get(index); + long rowCountEstimate = (index - prevIndex) * DatabaseDescriptor.getIndexInterval(); + splits.add(Pair.create(new Range<Token>(prevToken, token), rowCountEstimate)); + prevIndex = index; + prevToken = token; } + return splits; + } + private List<Token> keysToTokens(Range<Token> range, List<DecoratedKey> keys) + { + List<Token> tokens = Lists.newArrayListWithExpectedSize(keys.size() + 2); + tokens.add(range.left); + for (DecoratedKey key : keys) + tokens.add(key.token); tokens.add(range.right); return tokens; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/533bf3f6/src/java/org/apache/cassandra/thrift/CassandraServer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java index ad416f3..3bf155e 100644 --- a/src/java/org/apache/cassandra/thrift/CassandraServer.java +++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java @@ -30,6 +30,8 @@ import java.util.zip.Inflater; import com.google.common.base.Predicates; import com.google.common.collect.Maps; +import org.apache.cassandra.hadoop.ColumnFamilySplit; +import org.apache.cassandra.utils.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -882,18 +884,33 @@ public class CassandraServer implements Cassandra.Iface return DatabaseDescriptor.getEndpointSnitch().getClass().getName(); } + @Deprecated public List<String> describe_splits(String cfName, String start_token, String end_token, int keys_per_split) throws TException, InvalidRequestException { + List<CfSplit> splits = describe_splits_ex(cfName, start_token, end_token, keys_per_split); + List<String> result = new ArrayList<String>(splits.size() + 1); + + result.add(splits.get(0).getStart_token()); + for (CfSplit cfSplit : splits) + result.add(cfSplit.getEnd_token()); + + return result; + } + + @Override + public List<CfSplit> describe_splits_ex(String cfName, String start_token, String end_token, int keys_per_split) + throws InvalidRequestException, TException + { // TODO: add keyspace authorization call post CASSANDRA-1425 Token.TokenFactory tf = StorageService.getPartitioner().getTokenFactory(); - List<Token> tokens = StorageService.instance.getSplits(state().getKeyspace(), cfName, new Range<Token>(tf.fromString(start_token), tf.fromString(end_token)), keys_per_split); - List<String> splits = new ArrayList<String>(tokens.size()); - for (Token token : tokens) - { - splits.add(tf.toString(token)); - } - return splits; + Range<Token> tr = new Range<Token>(tf.fromString(start_token), tf.fromString(end_token)); + List<Pair<Range<Token>, Long>> splits = + StorageService.instance.getSplits(state().getKeyspace(), cfName, tr, keys_per_split); + List<CfSplit> result = new ArrayList<CfSplit>(splits.size()); + for (Pair<Range<Token>, Long> split : splits) + result.add(new CfSplit(split.left.left.toString(), split.left.right.toString(), split.right)); + return result; } public void login(AuthenticationRequest auth_request) throws AuthenticationException, AuthorizationException, TException
