Repository: cassandra Updated Branches: refs/heads/trunk 37986e825 -> 50f7e0204
Close Clusters and Sessions in Hadoop Input/Output classes patch by Alex Liu; reviewed by Benjamin Lerer for CASSANDRA-10837 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2da3c9db Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2da3c9db Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2da3c9db Branch: refs/heads/trunk Commit: 2da3c9db154449e15d5a2c2072db77b65c9e931a Parents: ed96322 Author: Alex Liu <[email protected]> Authored: Thu Dec 17 12:18:46 2015 +0100 Committer: blerer <[email protected]> Committed: Thu Dec 17 12:18:46 2015 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/hadoop/cql3/CqlInputFormat.java | 46 +++++++------- .../cassandra/hadoop/cql3/CqlRecordWriter.java | 63 +++++++------------- 3 files changed, 46 insertions(+), 64 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/2da3c9db/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 7677e38..a2951a8 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.3 + * (Hadoop) Close Clusters and Sessions in Hadoop Input/Output classes (CASSANDRA-1837) * Fix sstableloader not working with upper case keyspace name (CASSANDRA-10806) Merged from 2.2: * Add new types to Stress (CASSANDRA-9556) http://git-wip-us.apache.org/repos/asf/cassandra/blob/2da3c9db/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java index 534e66d..a426532 100644 --- a/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java +++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java @@ -21,12 +21,14 @@ import java.io.IOException; import java.util.*; import java.util.concurrent.*; +import com.datastax.driver.core.Cluster; import com.datastax.driver.core.Host; import com.datastax.driver.core.Metadata; import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Row; import com.datastax.driver.core.Session; import com.datastax.driver.core.TokenRange; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; @@ -37,12 +39,12 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.dht.*; import org.apache.cassandra.thrift.KeyRange; import org.apache.cassandra.hadoop.*; +import static java.util.stream.Collectors.toMap; /** * Hadoop InputFormat allowing map/reduce against Cassandra rows within one ColumnFamily. @@ -72,7 +74,6 @@ public class CqlInputFormat extends org.apache.hadoop.mapreduce.InputFormat<Long private String keyspace; private String cfName; private IPartitioner partitioner; - private Session session; public RecordReader<Long, Row> getRecordReader(InputSplit split, JobConf jobConf, final Reporter reporter) throws IOException @@ -123,14 +124,12 @@ public class CqlInputFormat extends org.apache.hadoop.mapreduce.InputFormat<Long partitioner = ConfigHelper.getInputPartitioner(conf); logger.trace("partitioner is {}", partitioner); - // canonical ranges and nodes holding replicas - Map<TokenRange, Set<Host>> masterRangeNodes = getRangeMap(conf, keyspace); - // canonical ranges, split into pieces, fetching the splits in parallel ExecutorService executor = new ThreadPoolExecutor(0, 128, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()); List<org.apache.hadoop.mapreduce.InputSplit> splits = new ArrayList<>(); - try + try (Cluster cluster = CqlConfigHelper.getInputCluster(ConfigHelper.getInputInitialAddress(conf).split(","), conf); + Session session = cluster.connect()) { List<Future<List<org.apache.hadoop.mapreduce.InputSplit>>> splitfutures = new ArrayList<>(); KeyRange jobKeyRange = ConfigHelper.getInputKeyRange(conf); @@ -159,15 +158,17 @@ public class CqlInputFormat extends org.apache.hadoop.mapreduce.InputFormat<Long } } - session = CqlConfigHelper.getInputCluster(ConfigHelper.getInputInitialAddress(conf).split(","), conf).connect(); - Metadata metadata = session.getCluster().getMetadata(); + Metadata metadata = cluster.getMetadata(); + + // canonical ranges and nodes holding replicas + Map<TokenRange, Set<Host>> masterRangeNodes = getRangeMap(keyspace, metadata); for (TokenRange range : masterRangeNodes.keySet()) { if (jobRange == null) { // for each tokenRange, pick a live owner and ask it to compute bite-sized splits - splitfutures.add(executor.submit(new SplitCallable(range, masterRangeNodes.get(range), conf))); + splitfutures.add(executor.submit(new SplitCallable(range, masterRangeNodes.get(range), conf, session))); } else { @@ -177,7 +178,7 @@ public class CqlInputFormat extends org.apache.hadoop.mapreduce.InputFormat<Long for (TokenRange intersection: range.intersectWith(jobTokenRange)) { // for each tokenRange, pick a live owner and ask it to compute bite-sized splits - splitfutures.add(executor.submit(new SplitCallable(intersection, masterRangeNodes.get(range), conf))); + splitfutures.add(executor.submit(new SplitCallable(intersection, masterRangeNodes.get(range), conf, session))); } } } @@ -212,13 +213,13 @@ public class CqlInputFormat extends org.apache.hadoop.mapreduce.InputFormat<Long metadata.newToken(partitioner.getTokenFactory().toString(range.right))); } - private Map<TokenRange, Long> getSubSplits(String keyspace, String cfName, TokenRange range, Configuration conf) throws IOException + private Map<TokenRange, Long> getSubSplits(String keyspace, String cfName, TokenRange range, Configuration conf, Session session) throws IOException { int splitSize = ConfigHelper.getInputSplitSize(conf); int splitSizeMb = ConfigHelper.getInputSplitSizeInMb(conf); try { - return describeSplits(keyspace, cfName, range, splitSize, splitSizeMb); + return describeSplits(keyspace, cfName, range, splitSize, splitSizeMb, session); } catch (Exception e) { @@ -226,19 +227,14 @@ public class CqlInputFormat extends org.apache.hadoop.mapreduce.InputFormat<Long } } - private Map<TokenRange, Set<Host>> getRangeMap(Configuration conf, String keyspace) + private Map<TokenRange, Set<Host>> getRangeMap(String keyspace, Metadata metadata) { - try (Session session = CqlConfigHelper.getInputCluster(ConfigHelper.getInputInitialAddress(conf).split(","), conf).connect()) - { - Map<TokenRange, Set<Host>> map = new HashMap<>(); - Metadata metadata = session.getCluster().getMetadata(); - for (TokenRange tokenRange : metadata.getTokenRanges()) - map.put(tokenRange, metadata.getReplicas('"' + keyspace + '"', tokenRange)); - return map; - } + return metadata.getTokenRanges() + .stream() + .collect(toMap(p -> p, p -> metadata.getReplicas('"' + keyspace + '"', p))); } - private Map<TokenRange, Long> describeSplits(String keyspace, String table, TokenRange tokenRange, int splitSize, int splitSizeMb) + private Map<TokenRange, Long> describeSplits(String keyspace, String table, TokenRange tokenRange, int splitSize, int splitSizeMb, Session session) { String query = String.format("SELECT mean_partition_size, partitions_count " + "FROM %s.%s " + @@ -303,19 +299,21 @@ public class CqlInputFormat extends org.apache.hadoop.mapreduce.InputFormat<Long private final TokenRange tokenRange; private final Set<Host> hosts; private final Configuration conf; + private final Session session; - public SplitCallable(TokenRange tr, Set<Host> hosts, Configuration conf) + public SplitCallable(TokenRange tr, Set<Host> hosts, Configuration conf, Session session) { this.tokenRange = tr; this.hosts = hosts; this.conf = conf; + this.session = session; } public List<org.apache.hadoop.mapreduce.InputSplit> call() throws Exception { ArrayList<org.apache.hadoop.mapreduce.InputSplit> splits = new ArrayList<>(); Map<TokenRange, Long> subSplits; - subSplits = getSubSplits(keyspace, cfName, tokenRange, conf); + subSplits = getSubSplits(keyspace, cfName, tokenRange, conf, session); // turn the sub-ranges into InputSplits String[] endpoints = new String[hosts.size()]; http://git-wip-us.apache.org/repos/asf/cassandra/blob/2da3c9db/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java index 96815ef..4c9b249 100644 --- a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java +++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java @@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory; import com.datastax.driver.core.*; import com.datastax.driver.core.exceptions.*; + import org.apache.cassandra.db.marshal.CompositeType; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Token; @@ -39,6 +40,8 @@ import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.util.Progressable; +import static java.util.stream.Collectors.toMap; + /** * The <code>CqlRecordWriter</code> maps the output <key, value> * pairs to a Cassandra table. In particular, it applies the binded variables @@ -113,25 +116,18 @@ class CqlRecordWriter extends RecordWriter<Map<String, ByteBuffer>, List<ByteBuf this.clients = new HashMap<>(); String keyspace = ConfigHelper.getOutputKeyspace(conf); - try (Cluster cluster = CqlConfigHelper.getOutputCluster(ConfigHelper.getOutputInitialAddress(conf), conf); - Session client = cluster.connect(keyspace)) + try (Cluster cluster = CqlConfigHelper.getOutputCluster(ConfigHelper.getOutputInitialAddress(conf), conf)) { - ringCache = new NativeRingCache(conf); - if (client != null) - { - TableMetadata tableMetadata = client.getCluster().getMetadata().getKeyspace(client.getLoggedKeyspace()).getTable(ConfigHelper.getOutputColumnFamily(conf)); - clusterColumns = tableMetadata.getClusteringColumns(); - partitionKeyColumns = tableMetadata.getPartitionKey(); - - String cqlQuery = CqlConfigHelper.getOutputCql(conf).trim(); - if (cqlQuery.toLowerCase().startsWith("insert")) - throw new UnsupportedOperationException("INSERT with CqlRecordWriter is not supported, please use UPDATE/DELETE statement"); - cql = appendKeyWhereClauses(cqlQuery); - } - else - { - throw new IllegalArgumentException("Invalid configuration specified " + conf); - } + Metadata metadata = cluster.getMetadata(); + ringCache = new NativeRingCache(conf, metadata); + TableMetadata tableMetadata = metadata.getKeyspace(Metadata.quote(keyspace)).getTable(ConfigHelper.getOutputColumnFamily(conf)); + clusterColumns = tableMetadata.getClusteringColumns(); + partitionKeyColumns = tableMetadata.getPartitionKey(); + + String cqlQuery = CqlConfigHelper.getOutputCql(conf).trim(); + if (cqlQuery.toLowerCase().startsWith("insert")) + throw new UnsupportedOperationException("INSERT with CqlRecordWriter is not supported, please use UPDATE/DELETE statement"); + cql = appendKeyWhereClauses(cqlQuery); } catch (Exception e) { @@ -383,9 +379,9 @@ class CqlRecordWriter extends RecordWriter<Map<String, ByteBuffer>, List<ByteBuf finally { closeSession(session); + // close all our connections once we are done. + closeInternal(); } - // close all our connections once we are done. - closeInternal(); } /** get prepared statement id from cache, otherwise prepare it from Cassandra server*/ @@ -496,31 +492,18 @@ class CqlRecordWriter extends RecordWriter<Map<String, ByteBuffer>, List<ByteBuf static class NativeRingCache { - private Map<TokenRange, Set<Host>> rangeMap; - private Metadata metadata; + private final Map<TokenRange, Set<Host>> rangeMap; + private final Metadata metadata; private final IPartitioner partitioner; - private final Configuration conf; - public NativeRingCache(Configuration conf) + public NativeRingCache(Configuration conf, Metadata metadata) { - this.conf = conf; this.partitioner = ConfigHelper.getOutputPartitioner(conf); - refreshEndpointMap(); - } - - - private void refreshEndpointMap() - { + this.metadata = metadata; String keyspace = ConfigHelper.getOutputKeyspace(conf); - try (Cluster cluster = CqlConfigHelper.getOutputCluster(ConfigHelper.getOutputInitialAddress(conf), conf); - Session session = cluster.connect(keyspace)) - { - rangeMap = new HashMap<>(); - metadata = session.getCluster().getMetadata(); - Set<TokenRange> ranges = metadata.getTokenRanges(); - for (TokenRange range : ranges) - rangeMap.put(range, metadata.getReplicas(keyspace, range)); - } + this.rangeMap = metadata.getTokenRanges() + .stream() + .collect(toMap(p -> p, p -> metadata.getReplicas('"' + keyspace + '"', p))); } public TokenRange getRange(ByteBuffer key)
