Remove Thrift dependencies in bundled tools patch by Philip Thompson; reviewed by Aleksey Yeschenko for CASSANDRA-8358
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f698cc22 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f698cc22 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f698cc22 Branch: refs/heads/trunk Commit: f698cc228452e847e3ad46bd8178549cf8171767 Parents: 5b61545 Author: Philip Thompson <[email protected]> Authored: Tue May 5 21:38:23 2015 +0300 Committer: Aleksey Yeschenko <[email protected]> Committed: Tue May 5 23:57:39 2015 +0300 ---------------------------------------------------------------------- CHANGES.txt | 1 + NEWS.txt | 3 + build.xml | 8 +- lib/cassandra-driver-core-2.1.2.jar | Bin 638544 -> 0 bytes lib/cassandra-driver-core-2.1.5-shaded.jar | Bin 0 -> 1994984 bytes .../apache/cassandra/db/marshal/TypeParser.java | 71 +- .../hadoop/AbstractBulkOutputFormat.java | 73 -- .../hadoop/AbstractBulkRecordWriter.java | 239 ------- .../hadoop/AbstractColumnFamilyInputFormat.java | 245 +++---- .../AbstractColumnFamilyOutputFormat.java | 164 ----- .../AbstractColumnFamilyRecordWriter.java | 193 ----- .../cassandra/hadoop/BulkOutputFormat.java | 51 +- .../cassandra/hadoop/BulkRecordWriter.java | 145 +++- .../hadoop/ColumnFamilyInputFormat.java | 47 +- .../hadoop/ColumnFamilyOutputFormat.java | 119 +++- .../hadoop/ColumnFamilyRecordReader.java | 1 + .../hadoop/ColumnFamilyRecordWriter.java | 124 +++- .../hadoop/cql3/CqlBulkOutputFormat.java | 81 ++- .../hadoop/cql3/CqlBulkRecordWriter.java | 223 ++++-- .../cassandra/hadoop/cql3/CqlConfigHelper.java | 52 +- .../cassandra/hadoop/cql3/CqlOutputFormat.java | 76 +- .../cassandra/hadoop/cql3/CqlRecordReader.java | 93 ++- .../cassandra/hadoop/cql3/CqlRecordWriter.java | 392 ++++++---- .../cassandra/hadoop/pig/CassandraStorage.java | 706 +++++++++++++++++-- .../cassandra/hadoop/pig/CqlNativeStorage.java | 629 +++++++++++------ .../cassandra/hadoop/pig/StorageHelper.java | 121 ++++ .../cassandra/io/sstable/SSTableLoader.java | 13 +- .../cassandra/service/StorageService.java | 10 +- .../org/apache/cassandra/tools/BulkLoader.java | 209 ++---- .../utils/NativeSSTableLoaderClient.java | 126 ++++ .../org/apache/cassandra/pig/CqlTableTest.java | 81 +-- .../io/sstable/CQLSSTableWriterTest.java | 14 +- .../cassandra/io/sstable/SSTableLoaderTest.java | 7 +- 33 files changed, 2678 insertions(+), 1639 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 8f89ece..ab92aa0 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0 + * Remove Thrift dependencies in bundled tools (CASSANDRA-8358) * Disable memory mapping of hsperfdata file for JVM statistics (CASSANDRA-9242) * Add pre-startup checks to detect potential incompatibilities (CASSANDRA-8049) * Distinguish between null and unset in protocol v4 (CASSANDRA-7304) http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/NEWS.txt ---------------------------------------------------------------------- diff --git a/NEWS.txt b/NEWS.txt index 03008de..32351a1 100644 --- a/NEWS.txt +++ b/NEWS.txt @@ -75,6 +75,9 @@ New features Upgrading --------- - Pig's CqlStorage has been removed, use CqlNativeStorage instead + - Pig's CassandraStorage has been deprecated. CassandraStorage + should only be used against tables created via thrift. + Use CqlNativeStorage for all other tables. - IAuthenticator been updated to remove responsibility for user/role maintenance and is now solely responsible for validating credentials, This is primarily done via SASL, though an optional method exists for http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/build.xml ---------------------------------------------------------------------- diff --git a/build.xml b/build.xml index ba99cd9..a5f195f 100644 --- a/build.xml +++ b/build.xml @@ -381,7 +381,7 @@ <dependency groupId="io.netty" artifactId="netty-all" version="4.0.23.Final" /> <dependency groupId="com.google.code.findbugs" artifactId="jsr305" version="2.0.2" /> <dependency groupId="com.clearspring.analytics" artifactId="stream" version="2.5.2" /> - <dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core" version="2.1.2" /> + <dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core" version="2.1.5" classifier="shaded" /> <dependency groupId="org.javassist" artifactId="javassist" version="3.18.2-GA" /> <dependency groupId="org.caffinitas.ohc" artifactId="ohc-core" version="0.3.4" /> <dependency groupId="net.sf.supercsv" artifactId="super-csv" version="2.1.0" /> @@ -433,7 +433,7 @@ <dependency groupId="org.apache.pig" artifactId="pig"/> <dependency groupId="com.google.code.findbugs" artifactId="jsr305"/> <dependency groupId="org.antlr" artifactId="antlr"/> - <dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core"/> + <dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core" classifier="shaded"/> <dependency groupId="org.javassist" artifactId="javassist"/> <dependency groupId="org.caffinitas.ohc" artifactId="ohc-core"/> <dependency groupId="org.openjdk.jmh" artifactId="jmh-core"/> @@ -501,12 +501,12 @@ <dependency groupId="org.apache.thrift" artifactId="libthrift"/> <dependency groupId="org.apache.cassandra" artifactId="cassandra-thrift"/> - + <!-- don't need hadoop classes to run, but if you use the hadoop stuff --> <dependency groupId="org.apache.hadoop" artifactId="hadoop-core" optional="true"/> <dependency groupId="org.apache.hadoop" artifactId="hadoop-minicluster" optional="true"/> <dependency groupId="org.apache.pig" artifactId="pig" optional="true"/> - <dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core" optional="true"/> + <dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core" classifier="shaded" optional="true"/> <!-- don't need jna to run, but nice to have --> http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/lib/cassandra-driver-core-2.1.2.jar ---------------------------------------------------------------------- diff --git a/lib/cassandra-driver-core-2.1.2.jar b/lib/cassandra-driver-core-2.1.2.jar deleted file mode 100644 index 2095c05..0000000 Binary files a/lib/cassandra-driver-core-2.1.2.jar and /dev/null differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/lib/cassandra-driver-core-2.1.5-shaded.jar ---------------------------------------------------------------------- diff --git a/lib/cassandra-driver-core-2.1.5-shaded.jar b/lib/cassandra-driver-core-2.1.5-shaded.jar new file mode 100644 index 0000000..bb83fb5 Binary files /dev/null and b/lib/cassandra-driver-core-2.1.5-shaded.jar differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/db/marshal/TypeParser.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/marshal/TypeParser.java b/src/java/org/apache/cassandra/db/marshal/TypeParser.java index ad7ffed..faa678e 100644 --- a/src/java/org/apache/cassandra/db/marshal/TypeParser.java +++ b/src/java/org/apache/cassandra/db/marshal/TypeParser.java @@ -21,13 +21,9 @@ import java.lang.reflect.Field; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; +import java.util.*; +import org.apache.cassandra.cql3.CQL3Type; import org.apache.cassandra.exceptions.*; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; @@ -42,7 +38,7 @@ public class TypeParser private int idx; // A cache of parsed string, specially useful for DynamicCompositeType - private static final Map<String, AbstractType<?>> cache = new HashMap<String, AbstractType<?>>(); + private static final Map<String, AbstractType<?>> cache = new HashMap<>(); public static final TypeParser EMPTY_PARSER = new TypeParser("", 0); @@ -98,9 +94,48 @@ public class TypeParser return parse(compareWith == null ? null : compareWith.toString()); } - public static String getShortName(AbstractType<?> type) + public static String parseCqlNativeType(String str) { - return type.getClass().getSimpleName(); + return CQL3Type.Native.valueOf(str.trim().toUpperCase(Locale.ENGLISH)).getType().toString(); + } + + public static String parseCqlCollectionOrFrozenType(String str) throws SyntaxException + { + str = str.trim().toLowerCase(); + switch (str) + { + case "map": return "MapType"; + case "set": return "SetType"; + case "list": return "ListType"; + case "frozen": return "FrozenType"; + default: throw new SyntaxException("Invalid type name" + str); + } + } + + /** + * Turns user facing type names into Abstract Types, 'text' -> UTF8Type + */ + public static AbstractType<?> parseCqlName(String str) throws SyntaxException, ConfigurationException + { + return parse(parseCqlNameRecurse(str)); + } + + private static String parseCqlNameRecurse(String str) throws SyntaxException + { + if (str.indexOf(',') >= 0 && (!str.contains("<") || (str.indexOf(',') < str.indexOf('<')))) + { + String[] parseString = str.split(",", 2); + return parseCqlNameRecurse(parseString[0]) + "," + parseCqlNameRecurse(parseString[1]); + } + else if (str.contains("<")) + { + String[] parseString = str.trim().split("<", 2); + return parseCqlCollectionOrFrozenType(parseString[0]) + "(" + parseCqlNameRecurse(parseString[1].substring(0, parseString[1].length()-1)) + ")"; + } + else + { + return parseCqlNativeType(str); + } } /** @@ -126,7 +161,7 @@ public class TypeParser if (str.charAt(idx) != '(') throw new IllegalStateException(); - Map<String, String> map = new HashMap<String, String>(); + Map<String, String> map = new HashMap<>(); ++idx; // skipping '(' while (skipBlankAndComma()) @@ -157,7 +192,7 @@ public class TypeParser public List<AbstractType<?>> getTypeParameters() throws SyntaxException, ConfigurationException { - List<AbstractType<?>> list = new ArrayList<AbstractType<?>>(); + List<AbstractType<?>> list = new ArrayList<>(); if (isEOS()) return list; @@ -191,7 +226,7 @@ public class TypeParser public Map<Byte, AbstractType<?>> getAliasParameters() throws SyntaxException, ConfigurationException { - Map<Byte, AbstractType<?>> map = new HashMap<Byte, AbstractType<?>>(); + Map<Byte, AbstractType<?>> map = new HashMap<>(); if (isEOS()) return map; @@ -384,11 +419,7 @@ public class TypeParser Field field = typeClass.getDeclaredField("instance"); return (AbstractType<?>) field.get(null); } - catch (NoSuchFieldException e) - { - throw new ConfigurationException("Invalid comparator class " + typeClass.getName() + ": must define a public static instance field or a public static method getInstance(TypeParser)."); - } - catch (IllegalAccessException e) + catch (NoSuchFieldException | IllegalAccessException e) { throw new ConfigurationException("Invalid comparator class " + typeClass.getName() + ": must define a public static instance field or a public static method getInstance(TypeParser)."); } @@ -489,12 +520,6 @@ public class TypeParser return str.substring(i, idx); } - public char readNextChar() - { - skipBlank(); - return str.charAt(idx++); - } - /** * Helper function to ease the writing of AbstractType.toString() methods. */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/AbstractBulkOutputFormat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/AbstractBulkOutputFormat.java b/src/java/org/apache/cassandra/hadoop/AbstractBulkOutputFormat.java deleted file mode 100644 index c0e91da..0000000 --- a/src/java/org/apache/cassandra/hadoop/AbstractBulkOutputFormat.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.hadoop; - - -import java.io.IOException; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapreduce.*; - -public abstract class AbstractBulkOutputFormat<K, V> extends OutputFormat<K, V> - implements org.apache.hadoop.mapred.OutputFormat<K, V> -{ - @Override - public void checkOutputSpecs(JobContext context) - { - checkOutputSpecs(HadoopCompat.getConfiguration(context)); - } - - private void checkOutputSpecs(Configuration conf) - { - if (ConfigHelper.getOutputKeyspace(conf) == null) - { - throw new UnsupportedOperationException("you must set the keyspace with setColumnFamily()"); - } - } - - @Override - public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException - { - return new NullOutputCommitter(); - } - - /** Fills the deprecated OutputFormat interface for streaming. */ - @Deprecated - public void checkOutputSpecs(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf job) throws IOException - { - checkOutputSpecs(job); - } - - public static class NullOutputCommitter extends OutputCommitter - { - public void abortTask(TaskAttemptContext taskContext) { } - - public void cleanupJob(JobContext jobContext) { } - - public void commitTask(TaskAttemptContext taskContext) { } - - public boolean needsTaskCommit(TaskAttemptContext taskContext) - { - return false; - } - - public void setupJob(JobContext jobContext) { } - - public void setupTask(TaskAttemptContext taskContext) { } - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/AbstractBulkRecordWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/AbstractBulkRecordWriter.java b/src/java/org/apache/cassandra/hadoop/AbstractBulkRecordWriter.java deleted file mode 100644 index 5ba0a96..0000000 --- a/src/java/org/apache/cassandra/hadoop/AbstractBulkRecordWriter.java +++ /dev/null @@ -1,239 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.hadoop; - -import java.io.Closeable; -import java.io.IOException; -import java.net.InetAddress; -import java.net.UnknownHostException; -import java.util.*; -import java.util.concurrent.*; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.cassandra.auth.PasswordAuthenticator; -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.config.Config; -import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.dht.Range; -import org.apache.cassandra.dht.Token; -import org.apache.cassandra.io.sstable.SSTableLoader; -import org.apache.cassandra.streaming.StreamState; -import org.apache.cassandra.thrift.*; -import org.apache.cassandra.utils.OutputHandler; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapreduce.RecordWriter; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.util.Progressable; - -public abstract class AbstractBulkRecordWriter<K, V> extends RecordWriter<K, V> -implements org.apache.hadoop.mapred.RecordWriter<K, V> -{ - public final static String OUTPUT_LOCATION = "mapreduce.output.bulkoutputformat.localdir"; - public final static String BUFFER_SIZE_IN_MB = "mapreduce.output.bulkoutputformat.buffersize"; - public final static String STREAM_THROTTLE_MBITS = "mapreduce.output.bulkoutputformat.streamthrottlembits"; - public final static String MAX_FAILED_HOSTS = "mapreduce.output.bulkoutputformat.maxfailedhosts"; - - private final Logger logger = LoggerFactory.getLogger(AbstractBulkRecordWriter.class); - - protected final Configuration conf; - protected final int maxFailures; - protected final int bufferSize; - protected Closeable writer; - protected SSTableLoader loader; - protected Progressable progress; - protected TaskAttemptContext context; - - protected AbstractBulkRecordWriter(TaskAttemptContext context) - { - this(HadoopCompat.getConfiguration(context)); - this.context = context; - } - - protected AbstractBulkRecordWriter(Configuration conf, Progressable progress) - { - this(conf); - this.progress = progress; - } - - protected AbstractBulkRecordWriter(Configuration conf) - { - Config.setOutboundBindAny(true); - this.conf = conf; - DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(Integer.parseInt(conf.get(STREAM_THROTTLE_MBITS, "0"))); - maxFailures = Integer.parseInt(conf.get(MAX_FAILED_HOSTS, "0")); - bufferSize = Integer.parseInt(conf.get(BUFFER_SIZE_IN_MB, "64")); - } - - protected String getOutputLocation() throws IOException - { - String dir = conf.get(OUTPUT_LOCATION, System.getProperty("java.io.tmpdir")); - if (dir == null) - throw new IOException("Output directory not defined, if hadoop is not setting java.io.tmpdir then define " + OUTPUT_LOCATION); - return dir; - } - - @Override - public void close(TaskAttemptContext context) throws IOException, InterruptedException - { - close(); - } - - /** Fills the deprecated RecordWriter interface for streaming. */ - @Deprecated - public void close(org.apache.hadoop.mapred.Reporter reporter) throws IOException - { - close(); - } - - private void close() throws IOException - { - if (writer != null) - { - writer.close(); - Future<StreamState> future = loader.stream(); - while (true) - { - try - { - future.get(1000, TimeUnit.MILLISECONDS); - break; - } - catch (ExecutionException | TimeoutException te) - { - if (null != progress) - progress.progress(); - if (null != context) - HadoopCompat.progress(context); - } - catch (InterruptedException e) - { - throw new IOException(e); - } - } - if (loader.getFailedHosts().size() > 0) - { - if (loader.getFailedHosts().size() > maxFailures) - throw new IOException("Too many hosts failed: " + loader.getFailedHosts()); - else - logger.warn("Some hosts failed: {}", loader.getFailedHosts()); - } - } - } - - public static class ExternalClient extends SSTableLoader.Client - { - private final Map<String, Map<String, CFMetaData>> knownCfs = new HashMap<>(); - private final Configuration conf; - private final String hostlist; - private final int rpcPort; - private final String username; - private final String password; - - public ExternalClient(Configuration conf) - { - super(); - this.conf = conf; - this.hostlist = ConfigHelper.getOutputInitialAddress(conf); - this.rpcPort = ConfigHelper.getOutputRpcPort(conf); - this.username = ConfigHelper.getOutputKeyspaceUserName(conf); - this.password = ConfigHelper.getOutputKeyspacePassword(conf); - } - - public void init(String keyspace) - { - String[] nodes = hostlist.split(","); - Set<InetAddress> hosts = new HashSet<InetAddress>(nodes.length); - for (String node : nodes) - { - try - { - hosts.add(InetAddress.getByName(node)); - } - catch (UnknownHostException e) - { - throw new RuntimeException(e); - } - } - Iterator<InetAddress> hostiter = hosts.iterator(); - while (hostiter.hasNext()) - { - try - { - InetAddress host = hostiter.next(); - Cassandra.Client client = ConfigHelper.createConnection(conf, host.getHostAddress(), rpcPort); - - // log in - client.set_keyspace(keyspace); - if (username != null) - { - Map<String, String> creds = new HashMap<String, String>(); - creds.put(PasswordAuthenticator.USERNAME_KEY, username); - creds.put(PasswordAuthenticator.PASSWORD_KEY, password); - AuthenticationRequest authRequest = new AuthenticationRequest(creds); - client.login(authRequest); - } - - List<TokenRange> tokenRanges = client.describe_ring(keyspace); - List<KsDef> ksDefs = client.describe_keyspaces(); - - setPartitioner(client.describe_partitioner()); - Token.TokenFactory tkFactory = getPartitioner().getTokenFactory(); - - for (TokenRange tr : tokenRanges) - { - Range<Token> range = new Range<Token>(tkFactory.fromString(tr.start_token), tkFactory.fromString(tr.end_token)); - for (String ep : tr.endpoints) - { - addRangeForEndpoint(range, InetAddress.getByName(ep)); - } - } - - for (KsDef ksDef : ksDefs) - { - Map<String, CFMetaData> cfs = new HashMap<>(ksDef.cf_defs.size()); - for (CfDef cfDef : ksDef.cf_defs) - cfs.put(cfDef.name, ThriftConversion.fromThrift(cfDef)); - knownCfs.put(ksDef.name, cfs); - } - break; - } - catch (Exception e) - { - if (!hostiter.hasNext()) - throw new RuntimeException("Could not retrieve endpoint ranges: ", e); - } - } - } - - public CFMetaData getCFMetaData(String keyspace, String cfName) - { - Map<String, CFMetaData> cfs = knownCfs.get(keyspace); - return cfs != null ? cfs.get(cfName) : null; - } - } - - public static class NullOutputHandler implements OutputHandler - { - public void output(String msg) {} - public void debug(String msg) {} - public void warn(String msg) {} - public void warn(String msg, Throwable th) {} - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java index 691bd76..2ef4cf4 100644 --- a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java +++ b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java @@ -18,31 +18,27 @@ package org.apache.cassandra.hadoop; import java.io.IOException; -import java.net.InetAddress; import java.util.*; import java.util.concurrent.*; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.auth.PasswordAuthenticator; +import com.datastax.driver.core.Host; +import com.datastax.driver.core.Metadata; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Row; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.TokenRange; +import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; -import org.apache.cassandra.thrift.*; +import org.apache.cassandra.hadoop.cql3.*; +import org.apache.cassandra.thrift.KeyRange; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.*; -import org.apache.thrift.TApplicationException; -import org.apache.thrift.TException; -import org.apache.thrift.protocol.TBinaryProtocol; -import org.apache.thrift.protocol.TProtocol; -import org.apache.thrift.transport.TTransport; -import org.apache.thrift.transport.TTransportException; - public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat<K, Y> implements org.apache.hadoop.mapred.InputFormat<K, Y> { @@ -51,7 +47,7 @@ public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat< public static final String MAPRED_TASK_ID = "mapred.task.id"; // The simple fact that we need this is because the old Hadoop API wants us to "write" // to the key and value whereas the new asks for it. - // I choose 8kb as the default max key size (instanciated only once), but you can + // I choose 8kb as the default max key size (instantiated only once), but you can // override it in your jobConf with this setting. public static final String CASSANDRA_HADOOP_MAX_KEY_SIZE = "cassandra.hadoop.max_key_size"; public static final int CASSANDRA_HADOOP_MAX_KEY_SIZE_DEFAULT = 8192; @@ -59,6 +55,7 @@ public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat< private String keyspace; private String cfName; private IPartitioner partitioner; + private Session session; protected void validateConfiguration(Configuration conf) { @@ -72,57 +69,27 @@ public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat< throw new UnsupportedOperationException("You must set the Cassandra partitioner class with setInputPartitioner"); } - public static Cassandra.Client createAuthenticatedClient(String location, int port, Configuration conf) throws Exception - { - logger.debug("Creating authenticated client for CF input format"); - TTransport transport; - try - { - transport = ConfigHelper.getClientTransportFactory(conf).openTransport(location, port); - } - catch (Exception e) - { - throw new TTransportException("Failed to open a transport to " + location + ":" + port + ".", e); - } - TProtocol binaryProtocol = new TBinaryProtocol(transport, true, true); - Cassandra.Client client = new Cassandra.Client(binaryProtocol); - - // log in - client.set_keyspace(ConfigHelper.getInputKeyspace(conf)); - if ((ConfigHelper.getInputKeyspaceUserName(conf) != null) && (ConfigHelper.getInputKeyspacePassword(conf) != null)) - { - Map<String, String> creds = new HashMap<String, String>(); - creds.put(PasswordAuthenticator.USERNAME_KEY, ConfigHelper.getInputKeyspaceUserName(conf)); - creds.put(PasswordAuthenticator.PASSWORD_KEY, ConfigHelper.getInputKeyspacePassword(conf)); - AuthenticationRequest authRequest = new AuthenticationRequest(creds); - client.login(authRequest); - } - logger.debug("Authenticated client for CF input format created successfully"); - return client; - } - public List<InputSplit> getSplits(JobContext context) throws IOException { - Configuration conf = HadoopCompat.getConfiguration(context);; + Configuration conf = HadoopCompat.getConfiguration(context); validateConfiguration(conf); - // cannonical ranges and nodes holding replicas - List<TokenRange> masterRangeNodes = getRangeMap(conf); - keyspace = ConfigHelper.getInputKeyspace(conf); cfName = ConfigHelper.getInputColumnFamily(conf); partitioner = ConfigHelper.getInputPartitioner(conf); logger.debug("partitioner is {}", partitioner); + // canonical ranges and nodes holding replicas + Map<TokenRange, Set<Host>> masterRangeNodes = getRangeMap(conf, keyspace); - // cannonical ranges, split into pieces, fetching the splits in parallel + // canonical ranges, split into pieces, fetching the splits in parallel ExecutorService executor = new ThreadPoolExecutor(0, 128, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()); - List<InputSplit> splits = new ArrayList<InputSplit>(); + List<InputSplit> splits = new ArrayList<>(); try { - List<Future<List<InputSplit>>> splitfutures = new ArrayList<Future<List<InputSplit>>>(); + List<Future<List<InputSplit>>> splitfutures = new ArrayList<>(); KeyRange jobKeyRange = ConfigHelper.getInputKeyRange(conf); Range<Token> jobRange = null; if (jobKeyRange != null) @@ -130,7 +97,7 @@ public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat< if (jobKeyRange.start_key != null) { if (!partitioner.preservesOrder()) - throw new UnsupportedOperationException("KeyRange based on keys can only be used with a order preserving paritioner"); + throw new UnsupportedOperationException("KeyRange based on keys can only be used with a order preserving partitioner"); if (jobKeyRange.start_token != null) throw new IllegalArgumentException("only start_key supported"); if (jobKeyRange.end_token != null) @@ -149,26 +116,25 @@ public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat< } } - for (TokenRange range : masterRangeNodes) + session = CqlConfigHelper.getInputCluster(ConfigHelper.getInputInitialAddress(conf).split(","), conf).connect(); + Metadata metadata = session.getCluster().getMetadata(); + + for (TokenRange range : masterRangeNodes.keySet()) { if (jobRange == null) { - // for each range, pick a live owner and ask it to compute bite-sized splits - splitfutures.add(executor.submit(new SplitCallable(range, conf))); + // for each tokenRange, pick a live owner and ask it to compute bite-sized splits + splitfutures.add(executor.submit(new SplitCallable(range, masterRangeNodes.get(range), conf))); } else { - Range<Token> dhtRange = new Range<Token>(partitioner.getTokenFactory().fromString(range.start_token), - partitioner.getTokenFactory().fromString(range.end_token)); - - if (dhtRange.intersects(jobRange)) + TokenRange jobTokenRange = rangeToTokenRange(metadata, jobRange); + if (range.intersects(jobTokenRange)) { - for (Range<Token> intersection: dhtRange.intersectionWith(jobRange)) + for (TokenRange intersection: range.intersectWith(jobTokenRange)) { - range.start_token = partitioner.getTokenFactory().toString(intersection.left); - range.end_token = partitioner.getTokenFactory().toString(intersection.right); - // for each range, pick a live owner and ask it to compute bite-sized splits - splitfutures.add(executor.submit(new SplitCallable(range, conf))); + // for each tokenRange, pick a live owner and ask it to compute bite-sized splits + splitfutures.add(executor.submit(new SplitCallable(intersection, masterRangeNodes.get(range), conf))); } } } @@ -197,53 +163,53 @@ public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat< return splits; } + private TokenRange rangeToTokenRange(Metadata metadata, Range<Token> range) + { + return metadata.newTokenRange(metadata.newToken(partitioner.getTokenFactory().toString(range.left)), + metadata.newToken(partitioner.getTokenFactory().toString(range.right))); + } + /** - * Gets a token range and splits it up according to the suggested + * Gets a token tokenRange and splits it up according to the suggested * size into input splits that Hadoop can use. */ class SplitCallable implements Callable<List<InputSplit>> { - private final TokenRange range; + private final TokenRange tokenRange; + private final Set<Host> hosts; private final Configuration conf; - public SplitCallable(TokenRange tr, Configuration conf) + public SplitCallable(TokenRange tr, Set<Host> hosts, Configuration conf) { - this.range = tr; + this.tokenRange = tr; + this.hosts = hosts; this.conf = conf; } public List<InputSplit> call() throws Exception { - ArrayList<InputSplit> splits = new ArrayList<InputSplit>(); - List<CfSplit> subSplits = getSubSplits(keyspace, cfName, range, conf); - assert range.rpc_endpoints.size() == range.endpoints.size() : "rpc_endpoints size must match endpoints size"; + ArrayList<InputSplit> splits = new ArrayList<>(); + Map<TokenRange, Long> subSplits; + subSplits = getSubSplits(keyspace, cfName, tokenRange, conf); // turn the sub-ranges into InputSplits - String[] endpoints = range.endpoints.toArray(new String[range.endpoints.size()]); + String[] endpoints = new String[hosts.size()]; + // hadoop needs hostname, not ip int endpointIndex = 0; - for (String endpoint: range.rpc_endpoints) - { - String endpoint_address = endpoint; - if (endpoint_address == null || endpoint_address.equals("0.0.0.0")) - endpoint_address = range.endpoints.get(endpointIndex); - endpoints[endpointIndex++] = InetAddress.getByName(endpoint_address).getHostName(); - } + for (Host endpoint : hosts) + endpoints[endpointIndex++] = endpoint.getAddress().getHostName(); - Token.TokenFactory factory = partitioner.getTokenFactory(); - for (CfSplit subSplit : subSplits) + for (TokenRange subSplit : subSplits.keySet()) { - Token left = factory.fromString(subSplit.getStart_token()); - Token right = factory.fromString(subSplit.getEnd_token()); - Range<Token> range = new Range<Token>(left, right); - List<Range<Token>> ranges = range.isWrapAround() ? range.unwrap() : ImmutableList.of(range); - for (Range<Token> subrange : ranges) + List<TokenRange> ranges = subSplit.unwrap(); + for (TokenRange subrange : ranges) { ColumnFamilySplit split = new ColumnFamilySplit( - factory.toString(subrange.left), - factory.toString(subrange.right), - subSplit.getRow_count(), + subrange.getStart().toString().substring(2), + subrange.getEnd().toString().substring(2), + subSplits.get(subSplit), endpoints); logger.debug("adding {}", split); @@ -254,80 +220,63 @@ public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat< } } - private List<CfSplit> getSubSplits(String keyspace, String cfName, TokenRange range, Configuration conf) throws IOException + private Map<TokenRange, Long> getSubSplits(String keyspace, String cfName, TokenRange range, Configuration conf) throws IOException { - int splitsize = ConfigHelper.getInputSplitSize(conf); - for (int i = 0; i < range.rpc_endpoints.size(); i++) + int splitSize = ConfigHelper.getInputSplitSize(conf); + try { - String host = range.rpc_endpoints.get(i); - - if (host == null || host.equals("0.0.0.0")) - host = range.endpoints.get(i); - - try - { - Cassandra.Client client = ConfigHelper.createConnection(conf, host, ConfigHelper.getInputRpcPort(conf)); - client.set_keyspace(keyspace); - - try - { - return client.describe_splits_ex(cfName, range.start_token, range.end_token, splitsize); - } - catch (TApplicationException e) - { - // fallback to guessing split size if talking to a server without describe_splits_ex method - if (e.getType() == TApplicationException.UNKNOWN_METHOD) - { - List<String> splitPoints = client.describe_splits(cfName, range.start_token, range.end_token, splitsize); - return tokenListToSplits(splitPoints, splitsize); - } - throw e; - } - } - catch (IOException e) - { - logger.debug("failed connect to endpoint {}", host, e); - } - catch (InvalidRequestException e) - { - throw new RuntimeException(e); - } - catch (TException e) - { - throw new RuntimeException(e); - } + return describeSplits(keyspace, cfName, range, splitSize); + } + catch (Exception e) + { + throw new RuntimeException(e); } - throw new IOException("failed connecting to all endpoints " + StringUtils.join(range.endpoints, ",")); } - private List<CfSplit> tokenListToSplits(List<String> splitTokens, int splitsize) + private Map<TokenRange, Set<Host>> getRangeMap(Configuration conf, String keyspace) { - List<CfSplit> splits = Lists.newArrayListWithExpectedSize(splitTokens.size() - 1); - for (int j = 0; j < splitTokens.size() - 1; j++) - splits.add(new CfSplit(splitTokens.get(j), splitTokens.get(j + 1), splitsize)); - return splits; + Session session = CqlConfigHelper.getInputCluster(ConfigHelper.getInputInitialAddress(conf).split(","), conf).connect(); + + Map<TokenRange, Set<Host>> map = new HashMap<>(); + Metadata metadata = session.getCluster().getMetadata(); + for (TokenRange tokenRange : metadata.getTokenRanges()) + map.put(tokenRange, metadata.getReplicas(keyspace, tokenRange)); + return map; } - private List<TokenRange> getRangeMap(Configuration conf) throws IOException + private Map<TokenRange, Long> describeSplits(String keyspace, String table, TokenRange tokenRange, int splitSize) { - Cassandra.Client client = ConfigHelper.getClientFromInputAddressList(conf); - - List<TokenRange> map; - try + String query = String.format("SELECT mean_partition_size, partitions_count " + + "FROM %s.%s " + + "WHERE keyspace_name = ? AND table_name = ? AND range_start = ? AND range_end = ?", + SystemKeyspace.NAME, + SystemKeyspace.SIZE_ESTIMATES); + + ResultSet resultSet = session.execute(query, keyspace, table, tokenRange.getStart().toString(), tokenRange.getEnd().toString()); + + Row row = resultSet.one(); + // If we have no data on this split, return the full split i.e., do not sub-split + // Assume smallest granularity of partition count available from CASSANDRA-7688 + if (row == null) { - map = client.describe_local_ring(ConfigHelper.getInputKeyspace(conf)); + Map<TokenRange, Long> wrappedTokenRange = new HashMap<>(); + wrappedTokenRange.put(tokenRange, (long) 128); + return wrappedTokenRange; } - catch (TException e) - { - throw new RuntimeException(e); - } - - return map; + + long meanPartitionSize = row.getLong("mean_partition_size"); + long partitionCount = row.getLong("partitions_count"); + + int splitCount = (int)((meanPartitionSize * partitionCount) / splitSize); + List<TokenRange> splitRanges = tokenRange.splitEvenly(splitCount); + Map<TokenRange, Long> rangesWithLength = new HashMap<>(); + for (TokenRange range : splitRanges) + rangesWithLength.put(range, partitionCount/splitCount); + + return rangesWithLength; } - // // Old Hadoop API - // public org.apache.hadoop.mapred.InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException { TaskAttemptContext tac = HadoopCompat.newTaskAttemptContext(jobConf, new TaskAttemptID()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyOutputFormat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyOutputFormat.java b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyOutputFormat.java deleted file mode 100644 index 03d0045..0000000 --- a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyOutputFormat.java +++ /dev/null @@ -1,164 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.hadoop; - - -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.cassandra.auth.PasswordAuthenticator; -import org.apache.cassandra.thrift.AuthenticationRequest; -import org.apache.cassandra.thrift.Cassandra; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapreduce.*; -import org.apache.thrift.protocol.TBinaryProtocol; -import org.apache.thrift.protocol.TProtocol; -import org.apache.thrift.transport.TTransport; - -/** - * The <code>ColumnFamilyOutputFormat</code> acts as a Hadoop-specific - * OutputFormat that allows reduce tasks to store keys (and corresponding - * values) as Cassandra rows (and respective columns) in a given - * ColumnFamily. - * - * <p> - * As is the case with the {@link ColumnFamilyInputFormat}, you need to set the - * Keyspace and ColumnFamily in your - * Hadoop job Configuration. The {@link ConfigHelper} class, through its - * {@link ConfigHelper#setOutputColumnFamily} method, is provided to make this - * simple. - * </p> - * - * <p> - * For the sake of performance, this class employs a lazy write-back caching - * mechanism, where its record writer batches mutations created based on the - * reduce's inputs (in a task-specific map), and periodically makes the changes - * official by sending a batch mutate request to Cassandra. - * </p> - * @param <Y> - */ -public abstract class AbstractColumnFamilyOutputFormat<K, Y> extends OutputFormat<K, Y> implements org.apache.hadoop.mapred.OutputFormat<K, Y> -{ - public static final String BATCH_THRESHOLD = "mapreduce.output.columnfamilyoutputformat.batch.threshold"; - public static final String QUEUE_SIZE = "mapreduce.output.columnfamilyoutputformat.queue.size"; - private static final Logger logger = LoggerFactory.getLogger(AbstractColumnFamilyOutputFormat.class); - - - /** - * Check for validity of the output-specification for the job. - * - * @param context - * information about the job - */ - public void checkOutputSpecs(JobContext context) - { - checkOutputSpecs(HadoopCompat.getConfiguration(context)); - } - - protected void checkOutputSpecs(Configuration conf) - { - if (ConfigHelper.getOutputKeyspace(conf) == null) - throw new UnsupportedOperationException("You must set the keyspace with setOutputKeyspace()"); - if (ConfigHelper.getOutputPartitioner(conf) == null) - throw new UnsupportedOperationException("You must set the output partitioner to the one used by your Cassandra cluster"); - if (ConfigHelper.getOutputInitialAddress(conf) == null) - throw new UnsupportedOperationException("You must set the initial output address to a Cassandra node"); - } - - /** Fills the deprecated OutputFormat interface for streaming. */ - @Deprecated - public void checkOutputSpecs(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf job) throws IOException - { - checkOutputSpecs(job); - } - - /** - * The OutputCommitter for this format does not write any data to the DFS. - * - * @param context - * the task context - * @return an output committer - * @throws IOException - * @throws InterruptedException - */ - public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException - { - return new NullOutputCommitter(); - } - - /** - * Connects to the given server:port and returns a client based on the given socket that points to the configured - * keyspace, and is logged in with the configured credentials. - * - * @param host fully qualified host name to connect to - * @param port RPC port of the server - * @param conf a job configuration - * @return a cassandra client - * @throws Exception set of thrown exceptions may be implementation defined, - * depending on the used transport factory - */ - public static Cassandra.Client createAuthenticatedClient(String host, int port, Configuration conf) throws Exception - { - logger.debug("Creating authenticated client for CF output format"); - TTransport transport = ConfigHelper.getClientTransportFactory(conf).openTransport(host, port); - TProtocol binaryProtocol = new TBinaryProtocol(transport, true, true); - Cassandra.Client client = new Cassandra.Client(binaryProtocol); - client.set_keyspace(ConfigHelper.getOutputKeyspace(conf)); - String user = ConfigHelper.getOutputKeyspaceUserName(conf); - String password = ConfigHelper.getOutputKeyspacePassword(conf); - if ((user != null) && (password != null)) - login(user, password, client); - - logger.debug("Authenticated client for CF output format created successfully"); - return client; - } - - public static void login(String user, String password, Cassandra.Client client) throws Exception - { - Map<String, String> creds = new HashMap<String, String>(); - creds.put(PasswordAuthenticator.USERNAME_KEY, user); - creds.put(PasswordAuthenticator.PASSWORD_KEY, password); - AuthenticationRequest authRequest = new AuthenticationRequest(creds); - client.login(authRequest); - } - - /** - * An {@link OutputCommitter} that does nothing. - */ - private static class NullOutputCommitter extends OutputCommitter - { - public void abortTask(TaskAttemptContext taskContext) { } - - public void cleanupJob(JobContext jobContext) { } - - public void commitTask(TaskAttemptContext taskContext) { } - - public boolean needsTaskCommit(TaskAttemptContext taskContext) - { - return false; - } - - public void setupJob(JobContext jobContext) { } - - public void setupTask(TaskAttemptContext taskContext) { } - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyRecordWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyRecordWriter.java b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyRecordWriter.java deleted file mode 100644 index cb44beb..0000000 --- a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyRecordWriter.java +++ /dev/null @@ -1,193 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.hadoop; - - -import java.io.IOException; -import java.net.InetAddress; -import java.util.*; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.TimeUnit; - -import org.apache.cassandra.client.RingCache; -import org.apache.cassandra.thrift.*; -import org.apache.cassandra.utils.FBUtilities; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapreduce.RecordWriter; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.thrift.transport.TTransport; -import org.apache.hadoop.util.Progressable; - - -/** - * The <code>ColumnFamilyRecordWriter</code> maps the output <key, value> - * pairs to a Cassandra column family. In particular, it applies all mutations - * in the value, which it associates with the key, and in turn the responsible - * endpoint. - * - * <p> - * Furthermore, this writer groups the mutations by the endpoint responsible for - * the rows being affected. This allows the mutations to be executed in parallel, - * directly to a responsible endpoint. - * </p> - * - * @see ColumnFamilyOutputFormat - */ -public abstract class AbstractColumnFamilyRecordWriter<K, Y> extends RecordWriter<K, Y> implements org.apache.hadoop.mapred.RecordWriter<K, Y> -{ - // The configuration this writer is associated with. - protected final Configuration conf; - - // The ring cache that describes the token ranges each node in the ring is - // responsible for. This is what allows us to group the mutations by - // the endpoints they should be targeted at. The targeted endpoint - // essentially - // acts as the primary replica for the rows being affected by the mutations. - protected final RingCache ringCache; - - // The number of mutations to buffer per endpoint - protected final int queueSize; - - protected final long batchThreshold; - - protected final ConsistencyLevel consistencyLevel; - protected Progressable progressable; - protected TaskAttemptContext context; - - protected AbstractColumnFamilyRecordWriter(Configuration conf) - { - this.conf = conf; - this.ringCache = new RingCache(conf); - this.queueSize = conf.getInt(AbstractColumnFamilyOutputFormat.QUEUE_SIZE, 32 * FBUtilities.getAvailableProcessors()); - batchThreshold = conf.getLong(AbstractColumnFamilyOutputFormat.BATCH_THRESHOLD, 32); - consistencyLevel = ConsistencyLevel.valueOf(ConfigHelper.getWriteConsistencyLevel(conf)); - } - - /** - * Close this <code>RecordWriter</code> to future operations, but not before - * flushing out the batched mutations. - * - * @param context the context of the task - * @throws IOException - */ - public void close(TaskAttemptContext context) throws IOException, InterruptedException - { - close(); - } - - /** Fills the deprecated RecordWriter interface for streaming. */ - @Deprecated - public void close(org.apache.hadoop.mapred.Reporter reporter) throws IOException - { - close(); - } - - protected abstract void close() throws IOException; - - /** - * A client that runs in a threadpool and connects to the list of endpoints for a particular - * range. Mutations for keys in that range are sent to this client via a queue. - */ - public abstract class AbstractRangeClient<K> extends Thread - { - // The list of endpoints for this range - protected final List<InetAddress> endpoints; - // A bounded queue of incoming mutations for this range - protected final BlockingQueue<K> queue = new ArrayBlockingQueue<K>(queueSize); - - protected volatile boolean run = true; - // we want the caller to know if something went wrong, so we record any unrecoverable exception while writing - // so we can throw it on the caller's stack when he calls put() again, or if there are no more put calls, - // when the client is closed. - protected volatile IOException lastException; - - protected Cassandra.Client client; - - /** - * Constructs an {@link AbstractRangeClient} for the given endpoints. - * @param endpoints the possible endpoints to execute the mutations on - */ - public AbstractRangeClient(List<InetAddress> endpoints) - { - super("client-" + endpoints); - this.endpoints = endpoints; - } - - /** - * enqueues the given value to Cassandra - */ - public void put(K value) throws IOException - { - while (true) - { - if (lastException != null) - throw lastException; - try - { - if (queue.offer(value, 100, TimeUnit.MILLISECONDS)) - break; - } - catch (InterruptedException e) - { - throw new AssertionError(e); - } - } - } - - public void close() throws IOException - { - // stop the run loop. this will result in closeInternal being called by the time join() finishes. - run = false; - interrupt(); - try - { - this.join(); - } - catch (InterruptedException e) - { - throw new AssertionError(e); - } - - if (lastException != null) - throw lastException; - } - - protected void closeInternal() - { - if (client != null) - { - TTransport transport = client.getOutputProtocol().getTransport(); - if (transport.isOpen()) - transport.close(); - } - } - - /** - * Loops collecting mutations from the queue and sending to Cassandra - */ - public abstract void run(); - - @Override - public String toString() - { - return "#<Client for " + endpoints + ">"; - } - } -} - http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java b/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java index f5a5a8d..5282279 100644 --- a/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java +++ b/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java @@ -23,9 +23,12 @@ import java.nio.ByteBuffer; import java.util.List; import org.apache.cassandra.thrift.Mutation; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.*; -public class BulkOutputFormat extends AbstractBulkOutputFormat<ByteBuffer,List<Mutation>> +@Deprecated +public class BulkOutputFormat extends OutputFormat<ByteBuffer,List<Mutation>> + implements org.apache.hadoop.mapred.OutputFormat<ByteBuffer,List<Mutation>> { /** Fills the deprecated OutputFormat interface for streaming. */ @Deprecated @@ -39,4 +42,50 @@ public class BulkOutputFormat extends AbstractBulkOutputFormat<ByteBuffer,List<M { return new BulkRecordWriter(context); } + + + @Override + public void checkOutputSpecs(JobContext context) + { + checkOutputSpecs(HadoopCompat.getConfiguration(context)); + } + + private void checkOutputSpecs(Configuration conf) + { + if (ConfigHelper.getOutputKeyspace(conf) == null) + { + throw new UnsupportedOperationException("you must set the keyspace with setColumnFamily()"); + } + } + + @Override + public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException + { + return new NullOutputCommitter(); + } + + /** Fills the deprecated OutputFormat interface for streaming. */ + @Deprecated + public void checkOutputSpecs(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf job) throws IOException + { + checkOutputSpecs(job); + } + + public static class NullOutputCommitter extends OutputCommitter + { + public void abortTask(TaskAttemptContext taskContext) { } + + public void cleanupJob(JobContext jobContext) { } + + public void commitTask(TaskAttemptContext taskContext) { } + + public boolean needsTaskCommit(TaskAttemptContext taskContext) + { + return false; + } + + public void setupJob(JobContext jobContext) { } + + public void setupTask(TaskAttemptContext taskContext) { } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java b/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java index d67b856..6b9ecb5 100644 --- a/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java +++ b/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java @@ -17,24 +17,57 @@ */ package org.apache.cassandra.hadoop; +import java.io.Closeable; import java.io.File; import java.io.IOException; +import java.net.InetAddress; +import java.net.UnknownHostException; import java.nio.ByteBuffer; -import java.util.List; +import java.util.*; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.Config; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.marshal.BytesType; +import org.apache.cassandra.hadoop.cql3.CqlConfigHelper; import org.apache.cassandra.io.sstable.SSTableLoader; import org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter; +import org.apache.cassandra.streaming.StreamState; import org.apache.cassandra.thrift.Column; import org.apache.cassandra.thrift.CounterColumn; import org.apache.cassandra.thrift.Mutation; +import org.apache.cassandra.utils.NativeSSTableLoaderClient; +import org.apache.cassandra.utils.OutputHandler; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.util.Progressable; -public final class BulkRecordWriter extends AbstractBulkRecordWriter<ByteBuffer, List<Mutation>> +@Deprecated +public final class BulkRecordWriter extends RecordWriter<ByteBuffer, List<Mutation>> + implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer, List<Mutation>> { + public final static String OUTPUT_LOCATION = "mapreduce.output.bulkoutputformat.localdir"; + public final static String BUFFER_SIZE_IN_MB = "mapreduce.output.bulkoutputformat.buffersize"; + public final static String STREAM_THROTTLE_MBITS = "mapreduce.output.bulkoutputformat.streamthrottlembits"; + public final static String MAX_FAILED_HOSTS = "mapreduce.output.bulkoutputformat.maxfailedhosts"; + + private final Logger logger = LoggerFactory.getLogger(BulkRecordWriter.class); + + protected final Configuration conf; + protected final int maxFailures; + protected final int bufferSize; + protected Closeable writer; + protected SSTableLoader loader; + protected Progressable progress; + protected TaskAttemptContext context; private File outputDir; @@ -55,17 +88,32 @@ public final class BulkRecordWriter extends AbstractBulkRecordWriter<ByteBuffer, BulkRecordWriter(TaskAttemptContext context) { - super(context); + + this(HadoopCompat.getConfiguration(context)); + this.context = context; } BulkRecordWriter(Configuration conf, Progressable progress) { - super(conf, progress); + this(conf); + this.progress = progress; } BulkRecordWriter(Configuration conf) { - super(conf); + Config.setOutboundBindAny(true); + this.conf = conf; + DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(Integer.parseInt(conf.get(STREAM_THROTTLE_MBITS, "0"))); + maxFailures = Integer.parseInt(conf.get(MAX_FAILED_HOSTS, "0")); + bufferSize = Integer.parseInt(conf.get(BUFFER_SIZE_IN_MB, "64")); + } + + protected String getOutputLocation() throws IOException + { + String dir = conf.get(OUTPUT_LOCATION, System.getProperty("java.io.tmpdir")); + if (dir == null) + throw new IOException("Output directory not defined, if hadoop is not setting java.io.tmpdir then define " + OUTPUT_LOCATION); + return dir; } private void setTypes(Mutation mutation) @@ -115,6 +163,54 @@ public final class BulkRecordWriter extends AbstractBulkRecordWriter<ByteBuffer, } @Override + public void close(TaskAttemptContext context) throws IOException, InterruptedException + { + close(); + } + + /** Fills the deprecated RecordWriter interface for streaming. */ + @Deprecated + public void close(org.apache.hadoop.mapred.Reporter reporter) throws IOException + { + close(); + } + + private void close() throws IOException + { + if (writer != null) + { + writer.close(); + Future<StreamState> future = loader.stream(); + while (true) + { + try + { + future.get(1000, TimeUnit.MILLISECONDS); + break; + } + catch (ExecutionException | TimeoutException te) + { + if (null != progress) + progress.progress(); + if (null != context) + HadoopCompat.progress(context); + } + catch (InterruptedException e) + { + throw new IOException(e); + } + } + if (loader.getFailedHosts().size() > 0) + { + if (loader.getFailedHosts().size() > maxFailures) + throw new IOException("Too many hosts failed: " + loader.getFailedHosts()); + else + logger.warn("Some hosts failed: {}", loader.getFailedHosts()); + } + } + } + + @Override public void write(ByteBuffer keybuff, List<Mutation> value) throws IOException { setTypes(value.get(0)); @@ -158,4 +254,43 @@ public final class BulkRecordWriter extends AbstractBulkRecordWriter<ByteBuffer, HadoopCompat.progress(context); } } + + public static class ExternalClient extends NativeSSTableLoaderClient + { + public ExternalClient(Configuration conf) + { + super(resolveHostAddresses(conf), + CqlConfigHelper.getOutputNativePort(conf), + ConfigHelper.getOutputKeyspaceUserName(conf), + ConfigHelper.getOutputKeyspacePassword(conf), + CqlConfigHelper.getSSLOptions(conf).orNull()); + } + + private static Collection<InetAddress> resolveHostAddresses(Configuration conf) + { + Set<InetAddress> addresses = new HashSet<>(); + + for (String host : ConfigHelper.getOutputInitialAddress(conf).split(",")) + { + try + { + addresses.add(InetAddress.getByName(host)); + } + catch (UnknownHostException e) + { + throw new RuntimeException(e); + } + } + + return addresses; + } + } + + public static class NullOutputHandler implements OutputHandler + { + public void output(String msg) {} + public void debug(String msg) {} + public void warn(String msg) {} + public void warn(String msg, Throwable th) {} + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java index 686d486..88dd2e2 100644 --- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java +++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java @@ -21,11 +21,23 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.auth.PasswordAuthenticator; import org.apache.cassandra.db.Cell; +import org.apache.cassandra.thrift.*; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.mapreduce.*; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; /** * Hadoop InputFormat allowing map/reduce against Cassandra rows within one ColumnFamily. @@ -44,9 +56,40 @@ import org.apache.hadoop.mapreduce.*; * * The default split size is 64k rows. */ +@Deprecated public class ColumnFamilyInputFormat extends AbstractColumnFamilyInputFormat<ByteBuffer, SortedMap<ByteBuffer, Cell>> { - + private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyInputFormat.class); + + public static Cassandra.Client createAuthenticatedClient(String location, int port, Configuration conf) throws Exception + { + logger.debug("Creating authenticated client for CF input format"); + TTransport transport; + try + { + transport = ConfigHelper.getClientTransportFactory(conf).openTransport(location, port); + } + catch (Exception e) + { + throw new TTransportException("Failed to open a transport to " + location + ":" + port + ".", e); + } + TProtocol binaryProtocol = new TBinaryProtocol(transport, true, true); + Cassandra.Client client = new Cassandra.Client(binaryProtocol); + + // log in + client.set_keyspace(ConfigHelper.getInputKeyspace(conf)); + if ((ConfigHelper.getInputKeyspaceUserName(conf) != null) && (ConfigHelper.getInputKeyspacePassword(conf) != null)) + { + Map<String, String> creds = new HashMap<String, String>(); + creds.put(PasswordAuthenticator.USERNAME_KEY, ConfigHelper.getInputKeyspaceUserName(conf)); + creds.put(PasswordAuthenticator.PASSWORD_KEY, ConfigHelper.getInputKeyspacePassword(conf)); + AuthenticationRequest authRequest = new AuthenticationRequest(creds); + client.login(authRequest); + } + logger.debug("Authenticated client for CF input format created successfully"); + return client; + } + public RecordReader<ByteBuffer, SortedMap<ByteBuffer, Cell>> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { return new ColumnFamilyRecordReader(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java index 2990bf3..94ced69 100644 --- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java +++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java @@ -18,10 +18,18 @@ package org.apache.cassandra.hadoop; +import java.io.*; import java.nio.ByteBuffer; -import java.util.List; +import java.util.*; + +import org.slf4j.*; + +import org.apache.cassandra.auth.*; import org.apache.cassandra.thrift.*; +import org.apache.hadoop.conf.*; import org.apache.hadoop.mapreduce.*; +import org.apache.thrift.protocol.*; +import org.apache.thrift.transport.*; /** * The <code>ColumnFamilyOutputFormat</code> acts as a Hadoop-specific @@ -44,8 +52,93 @@ import org.apache.hadoop.mapreduce.*; * official by sending a batch mutate request to Cassandra. * </p> */ -public class ColumnFamilyOutputFormat extends AbstractColumnFamilyOutputFormat<ByteBuffer,List<Mutation>> +@Deprecated +public class ColumnFamilyOutputFormat extends OutputFormat<ByteBuffer,List<Mutation>> + implements org.apache.hadoop.mapred.OutputFormat<ByteBuffer,List<Mutation>> { + public static final String BATCH_THRESHOLD = "mapreduce.output.columnfamilyoutputformat.batch.threshold"; + public static final String QUEUE_SIZE = "mapreduce.output.columnfamilyoutputformat.queue.size"; + + private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyOutputFormat.class); + + /** + * The OutputCommitter for this format does not write any data to the DFS. + * + * @param context + * the task context + * @return an output committer + * @throws IOException + * @throws InterruptedException + */ + public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException + { + return new NullOutputCommitter(); + } + + /** + * Check for validity of the output-specification for the job. + * + * @param context + * information about the job + */ + public void checkOutputSpecs(JobContext context) + { + checkOutputSpecs(HadoopCompat.getConfiguration(context)); + } + + protected void checkOutputSpecs(Configuration conf) + { + if (ConfigHelper.getOutputKeyspace(conf) == null) + throw new UnsupportedOperationException("You must set the keyspace with setOutputKeyspace()"); + if (ConfigHelper.getOutputPartitioner(conf) == null) + throw new UnsupportedOperationException("You must set the output partitioner to the one used by your Cassandra cluster"); + if (ConfigHelper.getOutputInitialAddress(conf) == null) + throw new UnsupportedOperationException("You must set the initial output address to a Cassandra node"); + } + + /** Fills the deprecated OutputFormat interface for streaming. */ + @Deprecated + public void checkOutputSpecs(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf job) throws IOException + { + checkOutputSpecs(job); + } + + /** + * Connects to the given server:port and returns a client based on the given socket that points to the configured + * keyspace, and is logged in with the configured credentials. + * + * @param host fully qualified host name to connect to + * @param port RPC port of the server + * @param conf a job configuration + * @return a cassandra client + * @throws Exception set of thrown exceptions may be implementation defined, + * depending on the used transport factory + */ + public static Cassandra.Client createAuthenticatedClient(String host, int port, Configuration conf) throws Exception + { + logger.debug("Creating authenticated client for CF output format"); + TTransport transport = ConfigHelper.getClientTransportFactory(conf).openTransport(host, port); + TProtocol binaryProtocol = new TBinaryProtocol(transport, true, true); + Cassandra.Client client = new Cassandra.Client(binaryProtocol); + client.set_keyspace(ConfigHelper.getOutputKeyspace(conf)); + String user = ConfigHelper.getOutputKeyspaceUserName(conf); + String password = ConfigHelper.getOutputKeyspacePassword(conf); + if ((user != null) && (password != null)) + login(user, password, client); + + logger.debug("Authenticated client for CF output format created successfully"); + return client; + } + + public static void login(String user, String password, Cassandra.Client client) throws Exception + { + Map<String, String> creds = new HashMap<String, String>(); + creds.put(PasswordAuthenticator.USERNAME_KEY, user); + creds.put(PasswordAuthenticator.PASSWORD_KEY, password); + AuthenticationRequest authRequest = new AuthenticationRequest(creds); + client.login(authRequest); + } + /** Fills the deprecated OutputFormat interface for streaming. */ @Deprecated public ColumnFamilyRecordWriter getRecordWriter(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf job, String name, org.apache.hadoop.util.Progressable progress) @@ -64,4 +157,26 @@ public class ColumnFamilyOutputFormat extends AbstractColumnFamilyOutputFormat<B { return new ColumnFamilyRecordWriter(context); } + + /** + * An {@link OutputCommitter} that does nothing. + */ + private static class NullOutputCommitter extends OutputCommitter + { + public void abortTask(TaskAttemptContext taskContext) { } + + public void cleanupJob(JobContext jobContext) { } + + public void commitTask(TaskAttemptContext taskContext) { } + + public boolean needsTaskCommit(TaskAttemptContext taskContext) + { + return false; + } + + public void setupJob(JobContext jobContext) { } + + public void setupTask(TaskAttemptContext taskContext) { } + } + } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java index 35437e9..d205f13 100644 --- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java +++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java @@ -48,6 +48,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.thrift.TException; import org.apache.thrift.transport.TTransport; +@Deprecated public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap<ByteBuffer, Cell>> implements org.apache.hadoop.mapred.RecordReader<ByteBuffer, SortedMap<ByteBuffer, Cell>> { http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java index d6a873b..31c7047 100644 --- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java +++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java @@ -22,15 +22,19 @@ import java.io.IOException; import java.net.InetAddress; import java.nio.ByteBuffer; import java.util.*; +import java.util.concurrent.*; +import org.apache.cassandra.client.*; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.thrift.*; -import org.apache.cassandra.utils.Pair; +import org.apache.cassandra.thrift.ConsistencyLevel; +import org.apache.cassandra.utils.*; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.*; import org.apache.thrift.TException; import org.apache.hadoop.util.Progressable; +import org.apache.thrift.transport.*; /** @@ -47,10 +51,30 @@ import org.apache.hadoop.util.Progressable; * * @see ColumnFamilyOutputFormat */ -final class ColumnFamilyRecordWriter extends AbstractColumnFamilyRecordWriter<ByteBuffer, List<Mutation>> +@Deprecated +final class ColumnFamilyRecordWriter extends RecordWriter<ByteBuffer, List<Mutation>> implements + org.apache.hadoop.mapred.RecordWriter<ByteBuffer, List<Mutation>> { + // The configuration this writer is associated with. + protected final Configuration conf; + + // The number of mutations to buffer per endpoint + protected final int queueSize; + + protected final long batchThreshold; + + protected final ConsistencyLevel consistencyLevel; + protected Progressable progressable; + protected TaskAttemptContext context; // handles for clients for each range running in the threadpool private final Map<Range, RangeClient> clients; + + // The ring cache that describes the token ranges each node in the ring is + // responsible for. This is what allows us to group the mutations by + // the endpoints they should be targeted at. The targeted endpoint + // essentially + // acts as the primary replica for the rows being affected by the mutations. + private final RingCache ringCache; /** * Upon construction, obtain the map that this writer will use to collect @@ -73,11 +97,33 @@ final class ColumnFamilyRecordWriter extends AbstractColumnFamilyRecordWriter<By ColumnFamilyRecordWriter(Configuration conf) { - super(conf); + this.conf = conf; + this.queueSize = conf.getInt(ColumnFamilyOutputFormat.QUEUE_SIZE, 32 * FBUtilities.getAvailableProcessors()); + batchThreshold = conf.getLong(ColumnFamilyOutputFormat.BATCH_THRESHOLD, 32); + consistencyLevel = ConsistencyLevel.valueOf(ConfigHelper.getWriteConsistencyLevel(conf)); + this.ringCache = new RingCache(conf); this.clients = new HashMap<Range, RangeClient>(); } - - @Override + + /** + * Close this <code>RecordWriter</code> to future operations, but not before + * flushing out the batched mutations. + * + * @param context the context of the task + * @throws IOException + */ + public void close(TaskAttemptContext context) throws IOException, InterruptedException + { + close(); + } + + /** Fills the deprecated RecordWriter interface for streaming. */ + @Deprecated + public void close(org.apache.hadoop.mapred.Reporter reporter) throws IOException + { + close(); + } + public void close() throws IOException { // close all the clients before throwing anything @@ -138,8 +184,20 @@ final class ColumnFamilyRecordWriter extends AbstractColumnFamilyRecordWriter<By * A client that runs in a threadpool and connects to the list of endpoints for a particular * range. Mutations for keys in that range are sent to this client via a queue. */ - public class RangeClient extends AbstractRangeClient<Pair<ByteBuffer, Mutation>> + public class RangeClient extends Thread { + // The list of endpoints for this range + protected final List<InetAddress> endpoints; + // A bounded queue of incoming mutations for this range + protected final BlockingQueue<Pair<ByteBuffer, Mutation>> queue = new ArrayBlockingQueue<>(queueSize); + + protected volatile boolean run = true; + // we want the caller to know if something went wrong, so we record any unrecoverable exception while writing + // so we can throw it on the caller's stack when he calls put() again, or if there are no more put calls, + // when the client is closed. + protected volatile IOException lastException; + + protected Cassandra.Client client; public final String columnFamily = ConfigHelper.getOutputColumnFamily(conf); /** @@ -148,8 +206,58 @@ final class ColumnFamilyRecordWriter extends AbstractColumnFamilyRecordWriter<By */ public RangeClient(List<InetAddress> endpoints) { - super(endpoints); + super("client-" + endpoints); + this.endpoints = endpoints; } + + /** + * enqueues the given value to Cassandra + */ + public void put(Pair<ByteBuffer, Mutation> value) throws IOException + { + while (true) + { + if (lastException != null) + throw lastException; + try + { + if (queue.offer(value, 100, TimeUnit.MILLISECONDS)) + break; + } + catch (InterruptedException e) + { + throw new AssertionError(e); + } + } + } + + public void close() throws IOException + { + // stop the run loop. this will result in closeInternal being called by the time join() finishes. + run = false; + interrupt(); + try + { + this.join(); + } + catch (InterruptedException e) + { + throw new AssertionError(e); + } + + if (lastException != null) + throw lastException; + } + + protected void closeInternal() + { + if (client != null) + { + TTransport transport = client.getOutputProtocol().getTransport(); + if (transport.isOpen()) + transport.close(); + } + } /** * Loops collecting mutations from the queue and sending to Cassandra
