Repository: cassandra Updated Branches: refs/heads/cassandra-2.1 0757dc72f -> 1cb426b98 refs/heads/trunk f1475244f -> 42e483a4e
Fix loading set types in pig with the 2.1 client driver. Patch by Artem Aliev, reviewed by brandonwilliams for CASSANDRA-8577 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1cb426b9 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1cb426b9 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1cb426b9 Branch: refs/heads/cassandra-2.1 Commit: 1cb426b9831b42b5f368eac51a6e3bebdb1bd62a Parents: 0757dc7 Author: Brandon Williams <[email protected]> Authored: Tue Jan 13 10:23:28 2015 -0600 Committer: Brandon Williams <[email protected]> Committed: Tue Jan 13 10:23:28 2015 -0600 ---------------------------------------------------------------------- .../apache/cassandra/hadoop/cql3/CqlConfigHelper.java | 11 +++++++++++ .../apache/cassandra/hadoop/cql3/CqlRecordReader.java | 12 ++++++++++++ .../cassandra/hadoop/pig/AbstractCassandraStorage.java | 5 ++++- .../apache/cassandra/hadoop/pig/CqlNativeStorage.java | 4 ++++ 4 files changed, 31 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/1cb426b9/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java index 2be811f..7d65663 100644 --- a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java +++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java @@ -81,6 +81,8 @@ public class CqlConfigHelper private static final String INPUT_NATIVE_SSL_KEY_STORE_PASSWARD = "cassandra.input.native.ssl.key.store.password"; private static final String INPUT_NATIVE_SSL_CIPHER_SUITES = "cassandra.input.native.ssl.cipher.suites"; + private static final String INPUT_NATIVE_PROTOCOL_VERSION = "cassandra.input.native.protocol.version"; + private static final String OUTPUT_CQL = "cassandra.output.cql"; /** @@ -279,6 +281,10 @@ public class CqlConfigHelper return conf.get(OUTPUT_CQL); } + private static Optional<Integer> getProtocolVersion(Configuration conf) { + return getIntSetting(INPUT_NATIVE_PROTOCOL_VERSION, conf); + } + public static Cluster getInputCluster(String host, Configuration conf) { // this method has been left for backward compatibility @@ -290,6 +296,7 @@ public class CqlConfigHelper int port = getInputNativePort(conf); Optional<AuthProvider> authProvider = getAuthProvider(conf); Optional<SSLOptions> sslOptions = getSSLOptions(conf); + Optional<Integer> protocolVersion = getProtocolVersion(conf); LoadBalancingPolicy loadBalancingPolicy = getReadLoadBalancingPolicy(conf, hosts); SocketOptions socketOptions = getReadSocketOptions(conf); QueryOptions queryOptions = getReadQueryOptions(conf); @@ -305,6 +312,9 @@ public class CqlConfigHelper if (sslOptions.isPresent()) builder.withSSL(sslOptions.get()); + if (protocolVersion.isPresent()) { + builder.withProtocolVersion(protocolVersion.get()); + } builder.withLoadBalancingPolicy(loadBalancingPolicy) .withSocketOptions(socketOptions) .withQueryOptions(queryOptions) @@ -313,6 +323,7 @@ public class CqlConfigHelper return builder.build(); } + public static void setInputCoreConnections(Configuration conf, String connections) { conf.set(INPUT_NATIVE_CORE_CONNECTIONS_PER_HOST, connections); http://git-wip-us.apache.org/repos/asf/cassandra/blob/1cb426b9/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java index 9c1118b..6a1f5bf 100644 --- a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java +++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java @@ -89,6 +89,7 @@ public class CqlRecordReader extends RecordReader<Long, Row> // partition keys -- key aliases private LinkedHashMap<String, Boolean> partitionBoundColumns = Maps.newLinkedHashMap(); + protected int nativeProtocolVersion = 1; public CqlRecordReader() { @@ -129,6 +130,9 @@ public class CqlRecordReader extends RecordReader<Long, Row> if (session == null) throw new RuntimeException("Can't create connection session"); + //get negotiated serialization protocol + nativeProtocolVersion = cluster.getConfiguration().getProtocolOptions().getProtocolVersion(); + // If the user provides a CQL query then we will use it without validation // otherwise we will fall back to building a query using the: // inputColumns @@ -230,6 +234,14 @@ public class CqlRecordReader extends RecordReader<Long, Row> return new WrappedRow(); } + /** + * Return native version protocol of the cluster connection + * @return serialization protocol version. + */ + public int getNativeProtocolVersion() { + return nativeProtocolVersion; + } + /** CQL row iterator * Input cql query * 1) select clause must include key columns (if we use partition key based row count) http://git-wip-us.apache.org/repos/asf/cassandra/blob/1cb426b9/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java index 361baa4..035f99a 100644 --- a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java +++ b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java @@ -62,6 +62,7 @@ import org.slf4j.LoggerFactory; */ public abstract class AbstractCassandraStorage extends LoadFunc implements StoreFuncInterface, LoadMetadata { + protected enum MarshallerType { COMPARATOR, DEFAULT_VALIDATOR, KEY_VALIDATOR, SUBCOMPARATOR }; // system environment variables that can be set to configure connection info: @@ -101,6 +102,8 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store protected boolean usePartitionFilter = false; protected String initHostAddress; protected String rpcPort; + protected int nativeProtocolVersion = 1; + public AbstractCassandraStorage() { @@ -793,7 +796,7 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store { // For CollectionType, the compose() method assumes the v3 protocol format of collection, which // is not correct here since we query using the CQL-over-thrift interface which use the pre-v3 format - return ((CollectionSerializer)validator.getSerializer()).deserializeForNativeProtocol(value, 1); + return ((CollectionSerializer)validator.getSerializer()).deserializeForNativeProtocol(value, nativeProtocolVersion); } return validator.compose(value); http://git-wip-us.apache.org/repos/asf/cassandra/blob/1cb426b9/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java index 3c59a1c..f0bb8f9 100644 --- a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java +++ b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java @@ -28,6 +28,7 @@ import org.apache.cassandra.db.composites.CellNames; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.hadoop.ConfigHelper; import org.apache.cassandra.hadoop.cql3.CqlConfigHelper; +import org.apache.cassandra.hadoop.cql3.CqlRecordReader; import org.apache.cassandra.thrift.CfDef; import org.apache.cassandra.thrift.ColumnDef; import org.apache.cassandra.utils.ByteBufferUtil; @@ -78,6 +79,9 @@ public class CqlNativeStorage extends CqlStorage public void prepareToRead(RecordReader reader, PigSplit split) { this.reader = reader; + if (reader instanceof CqlRecordReader) { + nativeProtocolVersion = ((CqlRecordReader) reader).getNativeProtocolVersion(); + } } /** get next row */
