Repository: cassandra Updated Branches: refs/heads/cassandra-2.1 6a7235e9e -> 88ad4f451 refs/heads/trunk a26f19266 -> c61784cd8
Add CqlOutputFormat patch by Paul Pak; reviewed by Piotr KoÅaczkowski for CASSANDRA-6927 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/88ad4f45 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/88ad4f45 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/88ad4f45 Branch: refs/heads/cassandra-2.1 Commit: 88ad4f4514765c62351ea02553769047a6c1e24c Parents: 6a7235e Author: Jonathan Ellis <[email protected]> Authored: Wed Aug 13 11:57:43 2014 -0500 Committer: Jonathan Ellis <[email protected]> Committed: Wed Aug 13 11:57:43 2014 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../hadoop/AbstractBulkOutputFormat.java | 73 ++++++ .../hadoop/AbstractBulkRecordWriter.java | 251 ++++++++++++++++++ .../cassandra/hadoop/BulkOutputFormat.java | 49 +--- .../cassandra/hadoop/BulkRecordWriter.java | 259 ++----------------- .../cassandra/hadoop/cql3/CqlConfigHelper.java | 2 +- .../cassandra/hadoop/cql3/CqlOutputFormat.java | 2 +- .../io/sstable/AbstractSSTableSimpleWriter.java | 10 +- .../cassandra/io/sstable/CQLSSTableWriter.java | 3 +- 9 files changed, 358 insertions(+), 292 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/88ad4f45/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 69c4adc..de93018 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.1.1 + * (Hadoop) Add CqlOutputFormat (CASSANDRA-6927) * Avoid IOOBE when building SyntaxError message snippet (CASSANDRA-7569) * SSTableExport uses correct validator to create string representation of partition keys (CASSANDRA-7498) http://git-wip-us.apache.org/repos/asf/cassandra/blob/88ad4f45/src/java/org/apache/cassandra/hadoop/AbstractBulkOutputFormat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/AbstractBulkOutputFormat.java b/src/java/org/apache/cassandra/hadoop/AbstractBulkOutputFormat.java new file mode 100644 index 0000000..c0e91da --- /dev/null +++ b/src/java/org/apache/cassandra/hadoop/AbstractBulkOutputFormat.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.hadoop; + + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.*; + +public abstract class AbstractBulkOutputFormat<K, V> extends OutputFormat<K, V> + implements org.apache.hadoop.mapred.OutputFormat<K, V> +{ + @Override + public void checkOutputSpecs(JobContext context) + { + checkOutputSpecs(HadoopCompat.getConfiguration(context)); + } + + private void checkOutputSpecs(Configuration conf) + { + if (ConfigHelper.getOutputKeyspace(conf) == null) + { + throw new UnsupportedOperationException("you must set the keyspace with setColumnFamily()"); + } + } + + @Override + public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException + { + return new NullOutputCommitter(); + } + + /** Fills the deprecated OutputFormat interface for streaming. */ + @Deprecated + public void checkOutputSpecs(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf job) throws IOException + { + checkOutputSpecs(job); + } + + public static class NullOutputCommitter extends OutputCommitter + { + public void abortTask(TaskAttemptContext taskContext) { } + + public void cleanupJob(JobContext jobContext) { } + + public void commitTask(TaskAttemptContext taskContext) { } + + public boolean needsTaskCommit(TaskAttemptContext taskContext) + { + return false; + } + + public void setupJob(JobContext jobContext) { } + + public void setupTask(TaskAttemptContext taskContext) { } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/88ad4f45/src/java/org/apache/cassandra/hadoop/AbstractBulkRecordWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/AbstractBulkRecordWriter.java b/src/java/org/apache/cassandra/hadoop/AbstractBulkRecordWriter.java new file mode 100644 index 0000000..22255a6 --- /dev/null +++ b/src/java/org/apache/cassandra/hadoop/AbstractBulkRecordWriter.java @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.hadoop; + +import java.io.Closeable; +import java.io.IOException; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.apache.cassandra.auth.IAuthenticator; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.Config; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.sstable.SSTableLoader; +import org.apache.cassandra.streaming.StreamState; +import org.apache.cassandra.thrift.AuthenticationRequest; +import org.apache.cassandra.thrift.Cassandra; +import org.apache.cassandra.thrift.CfDef; +import org.apache.cassandra.thrift.KsDef; +import org.apache.cassandra.thrift.TokenRange; +import org.apache.cassandra.utils.OutputHandler; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.util.Progressable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class AbstractBulkRecordWriter<K, V> extends RecordWriter<K, V> +implements org.apache.hadoop.mapred.RecordWriter<K, V> +{ + public final static String OUTPUT_LOCATION = "mapreduce.output.bulkoutputformat.localdir"; + public final static String BUFFER_SIZE_IN_MB = "mapreduce.output.bulkoutputformat.buffersize"; + public final static String STREAM_THROTTLE_MBITS = "mapreduce.output.bulkoutputformat.streamthrottlembits"; + public final static String MAX_FAILED_HOSTS = "mapreduce.output.bulkoutputformat.maxfailedhosts"; + + private final Logger logger = LoggerFactory.getLogger(AbstractBulkRecordWriter.class); + + protected final Configuration conf; + protected final int maxFailures; + protected final int bufferSize; + protected Closeable writer; + protected SSTableLoader loader; + protected Progressable progress; + protected TaskAttemptContext context; + + protected AbstractBulkRecordWriter(TaskAttemptContext context) + { + this(HadoopCompat.getConfiguration(context)); + this.context = context; + } + + protected AbstractBulkRecordWriter(Configuration conf, Progressable progress) + { + this(conf); + this.progress = progress; + } + + protected AbstractBulkRecordWriter(Configuration conf) + { + Config.setClientMode(true); + Config.setOutboundBindAny(true); + this.conf = conf; + DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(Integer.parseInt(conf.get(STREAM_THROTTLE_MBITS, "0"))); + maxFailures = Integer.parseInt(conf.get(MAX_FAILED_HOSTS, "0")); + bufferSize = Integer.parseInt(conf.get(BUFFER_SIZE_IN_MB, "64")); + } + + protected String getOutputLocation() throws IOException + { + String dir = conf.get(OUTPUT_LOCATION, System.getProperty("java.io.tmpdir")); + if (dir == null) + throw new IOException("Output directory not defined, if hadoop is not setting java.io.tmpdir then define " + OUTPUT_LOCATION); + return dir; + } + + @Override + public void close(TaskAttemptContext context) throws IOException, InterruptedException + { + close(); + } + + /** Fills the deprecated RecordWriter interface for streaming. */ + @Deprecated + public void close(org.apache.hadoop.mapred.Reporter reporter) throws IOException + { + close(); + } + + private void close() throws IOException + { + if (writer != null) + { + writer.close(); + Future<StreamState> future = loader.stream(); + while (true) + { + try + { + future.get(1000, TimeUnit.MILLISECONDS); + break; + } + catch (ExecutionException | TimeoutException te) + { + if (null != progress) + progress.progress(); + if (null != context) + HadoopCompat.progress(context); + } + catch (InterruptedException e) + { + throw new IOException(e); + } + } + if (loader.getFailedHosts().size() > 0) + { + if (loader.getFailedHosts().size() > maxFailures) + throw new IOException("Too many hosts failed: " + loader.getFailedHosts()); + else + logger.warn("Some hosts failed: {}", loader.getFailedHosts()); + } + } + } + + public static class ExternalClient extends SSTableLoader.Client + { + private final Map<String, Map<String, CFMetaData>> knownCfs = new HashMap<>(); + private final Configuration conf; + private final String hostlist; + private final int rpcPort; + private final String username; + private final String password; + + public ExternalClient(Configuration conf) + { + super(); + this.conf = conf; + this.hostlist = ConfigHelper.getOutputInitialAddress(conf); + this.rpcPort = ConfigHelper.getOutputRpcPort(conf); + this.username = ConfigHelper.getOutputKeyspaceUserName(conf); + this.password = ConfigHelper.getOutputKeyspacePassword(conf); + } + + public void init(String keyspace) + { + Set<InetAddress> hosts = new HashSet<InetAddress>(); + String[] nodes = hostlist.split(","); + for (String node : nodes) + { + try + { + hosts.add(InetAddress.getByName(node)); + } + catch (UnknownHostException e) + { + throw new RuntimeException(e); + } + } + Iterator<InetAddress> hostiter = hosts.iterator(); + while (hostiter.hasNext()) + { + try + { + InetAddress host = hostiter.next(); + Cassandra.Client client = ConfigHelper.createConnection(conf, host.getHostAddress(), rpcPort); + + // log in + client.set_keyspace(keyspace); + if (username != null) + { + Map<String, String> creds = new HashMap<String, String>(); + creds.put(IAuthenticator.USERNAME_KEY, username); + creds.put(IAuthenticator.PASSWORD_KEY, password); + AuthenticationRequest authRequest = new AuthenticationRequest(creds); + client.login(authRequest); + } + + List<TokenRange> tokenRanges = client.describe_ring(keyspace); + List<KsDef> ksDefs = client.describe_keyspaces(); + + setPartitioner(client.describe_partitioner()); + Token.TokenFactory tkFactory = getPartitioner().getTokenFactory(); + + for (TokenRange tr : tokenRanges) + { + Range<Token> range = new Range<Token>(tkFactory.fromString(tr.start_token), tkFactory.fromString(tr.end_token)); + for (String ep : tr.endpoints) + { + addRangeForEndpoint(range, InetAddress.getByName(ep)); + } + } + + for (KsDef ksDef : ksDefs) + { + Map<String, CFMetaData> cfs = new HashMap<>(ksDef.cf_defs.size()); + for (CfDef cfDef : ksDef.cf_defs) + cfs.put(cfDef.name, CFMetaData.fromThrift(cfDef)); + knownCfs.put(ksDef.name, cfs); + } + break; + } + catch (Exception e) + { + if (!hostiter.hasNext()) + throw new RuntimeException("Could not retrieve endpoint ranges: ", e); + } + } + } + + public CFMetaData getCFMetaData(String keyspace, String cfName) + { + Map<String, CFMetaData> cfs = knownCfs.get(keyspace); + return cfs != null ? cfs.get(cfName) : null; + } + } + + public static class NullOutputHandler implements OutputHandler + { + public void output(String msg) {} + public void debug(String msg) {} + public void warn(String msg) {} + public void warn(String msg, Throwable th) {} + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/88ad4f45/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java b/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java index c3d8e05..f5a5a8d 100644 --- a/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java +++ b/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java @@ -23,39 +23,10 @@ import java.nio.ByteBuffer; import java.util.List; import org.apache.cassandra.thrift.Mutation; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.*; -public class BulkOutputFormat extends OutputFormat<ByteBuffer,List<Mutation>> - implements org.apache.hadoop.mapred.OutputFormat<ByteBuffer,List<Mutation>> +public class BulkOutputFormat extends AbstractBulkOutputFormat<ByteBuffer,List<Mutation>> { - @Override - public void checkOutputSpecs(JobContext context) - { - checkOutputSpecs(HadoopCompat.getConfiguration(context)); - } - - private void checkOutputSpecs(Configuration conf) - { - if (ConfigHelper.getOutputKeyspace(conf) == null) - { - throw new UnsupportedOperationException("you must set the keyspace with setColumnFamily()"); - } - } - - @Override - public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException - { - return new NullOutputCommitter(); - } - - /** Fills the deprecated OutputFormat interface for streaming. */ - @Deprecated - public void checkOutputSpecs(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf job) throws IOException - { - checkOutputSpecs(job); - } - /** Fills the deprecated OutputFormat interface for streaming. */ @Deprecated public BulkRecordWriter getRecordWriter(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf job, String name, org.apache.hadoop.util.Progressable progress) throws IOException @@ -68,22 +39,4 @@ public class BulkOutputFormat extends OutputFormat<ByteBuffer,List<Mutation>> { return new BulkRecordWriter(context); } - - public static class NullOutputCommitter extends OutputCommitter - { - public void abortTask(TaskAttemptContext taskContext) { } - - public void cleanupJob(JobContext jobContext) { } - - public void commitTask(TaskAttemptContext taskContext) { } - - public boolean needsTaskCommit(TaskAttemptContext taskContext) - { - return false; - } - - public void setupJob(JobContext jobContext) { } - - public void setupTask(TaskAttemptContext taskContext) { } - } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/88ad4f45/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java b/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java index d6136a2..d67b856 100644 --- a/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java +++ b/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java @@ -19,57 +19,25 @@ package org.apache.cassandra.hadoop; import java.io.File; import java.io.IOException; -import java.net.InetAddress; import java.nio.ByteBuffer; -import java.net.UnknownHostException; -import java.util.*; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; +import java.util.List; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.cassandra.auth.IAuthenticator; -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.config.Config; -import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.marshal.BytesType; -import org.apache.cassandra.dht.Range; -import org.apache.cassandra.dht.Token; import org.apache.cassandra.io.sstable.SSTableLoader; import org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter; -import org.apache.cassandra.streaming.StreamState; -import org.apache.cassandra.thrift.*; -import org.apache.cassandra.utils.OutputHandler; +import org.apache.cassandra.thrift.Column; +import org.apache.cassandra.thrift.CounterColumn; +import org.apache.cassandra.thrift.Mutation; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.thrift.protocol.*; -import org.apache.thrift.transport.TFramedTransport; -import org.apache.thrift.transport.TSocket; -import org.apache.thrift.transport.TTransport; -import org.apache.thrift.transport.TTransportException; import org.apache.hadoop.util.Progressable; -final class BulkRecordWriter extends RecordWriter<ByteBuffer,List<Mutation>> -implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>> +public final class BulkRecordWriter extends AbstractBulkRecordWriter<ByteBuffer, List<Mutation>> { - private final static String OUTPUT_LOCATION = "mapreduce.output.bulkoutputformat.localdir"; - private final static String BUFFER_SIZE_IN_MB = "mapreduce.output.bulkoutputformat.buffersize"; - private final static String STREAM_THROTTLE_MBITS = "mapreduce.output.bulkoutputformat.streamthrottlembits"; - private final static String MAX_FAILED_HOSTS = "mapreduce.output.bulkoutputformat.maxfailedhosts"; - private final Configuration conf; - private final Logger logger = LoggerFactory.getLogger(BulkRecordWriter.class); - private SSTableSimpleUnsortedWriter writer; - private SSTableLoader loader; - private File outputdir; - private Progressable progress; - private TaskAttemptContext context; - private int maxFailures; - + private File outputDir; + + private enum CFType { NORMAL, @@ -87,31 +55,17 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>> BulkRecordWriter(TaskAttemptContext context) { - this(HadoopCompat.getConfiguration(context)); - this.context = context; + super(context); } BulkRecordWriter(Configuration conf, Progressable progress) { - this(conf); - this.progress = progress; + super(conf, progress); } BulkRecordWriter(Configuration conf) { - Config.setClientMode(true); - Config.setOutboundBindAny(true); - this.conf = conf; - DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(Integer.parseInt(conf.get(STREAM_THROTTLE_MBITS, "0"))); - maxFailures = Integer.parseInt(conf.get(MAX_FAILED_HOSTS, "0")); - } - - private String getOutputLocation() throws IOException - { - String dir = conf.get(OUTPUT_LOCATION, System.getProperty("java.io.tmpdir")); - if (dir == null) - throw new IOException("Output directory not defined, if hadoop is not setting java.io.tmpdir then define " + OUTPUT_LOCATION); - return dir; + super(conf); } private void setTypes(Mutation mutation) @@ -131,26 +85,23 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>> private void prepareWriter() throws IOException { - if (outputdir == null) + if (outputDir == null) { String keyspace = ConfigHelper.getOutputKeyspace(conf); //dir must be named by ks/cf for the loader - outputdir = new File(getOutputLocation() + File.separator + keyspace + File.separator + ConfigHelper.getOutputColumnFamily(conf)); - outputdir.mkdirs(); + outputDir = new File(getOutputLocation() + File.separator + keyspace + File.separator + ConfigHelper.getOutputColumnFamily(conf)); + outputDir.mkdirs(); } if (writer == null) { AbstractType<?> subcomparator = null; - ExternalClient externalClient = null; - String username = ConfigHelper.getOutputKeyspaceUserName(conf); - String password = ConfigHelper.getOutputKeyspacePassword(conf); if (cfType == CFType.SUPER) subcomparator = BytesType.instance; - this.writer = new SSTableSimpleUnsortedWriter( - outputdir, + writer = new SSTableSimpleUnsortedWriter( + outputDir, ConfigHelper.getOutputPartitioner(conf), ConfigHelper.getOutputKeyspace(conf), ConfigHelper.getOutputColumnFamily(conf), @@ -159,12 +110,7 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>> Integer.parseInt(conf.get(BUFFER_SIZE_IN_MB, "64")), ConfigHelper.getOutputCompressionParamaters(conf)); - externalClient = new ExternalClient(ConfigHelper.getOutputInitialAddress(conf), - ConfigHelper.getOutputRpcPort(conf), - username, - password); - - this.loader = new SSTableLoader(outputdir, externalClient, new NullOutputHandler()); + this.loader = new SSTableLoader(outputDir, new ExternalClient(conf), new NullOutputHandler()); } } @@ -173,36 +119,37 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>> { setTypes(value.get(0)); prepareWriter(); - writer.newRow(keybuff); + SSTableSimpleUnsortedWriter ssWriter = (SSTableSimpleUnsortedWriter) writer; + ssWriter.newRow(keybuff); for (Mutation mut : value) { if (cfType == CFType.SUPER) { - writer.newSuperColumn(mut.getColumn_or_supercolumn().getSuper_column().name); + ssWriter.newSuperColumn(mut.getColumn_or_supercolumn().getSuper_column().name); if (colType == ColType.COUNTER) for (CounterColumn column : mut.getColumn_or_supercolumn().getCounter_super_column().columns) - writer.addCounterColumn(column.name, column.value); + ssWriter.addCounterColumn(column.name, column.value); else { for (Column column : mut.getColumn_or_supercolumn().getSuper_column().columns) { if(column.ttl == 0) - writer.addColumn(column.name, column.value, column.timestamp); + ssWriter.addColumn(column.name, column.value, column.timestamp); else - writer.addExpiringColumn(column.name, column.value, column.timestamp, column.ttl, System.currentTimeMillis() + ((long)column.ttl * 1000)); + ssWriter.addExpiringColumn(column.name, column.value, column.timestamp, column.ttl, System.currentTimeMillis() + ((long)column.ttl * 1000)); } } } else { if (colType == ColType.COUNTER) - writer.addCounterColumn(mut.getColumn_or_supercolumn().counter_column.name, mut.getColumn_or_supercolumn().counter_column.value); + ssWriter.addCounterColumn(mut.getColumn_or_supercolumn().counter_column.name, mut.getColumn_or_supercolumn().counter_column.value); else { if(mut.getColumn_or_supercolumn().column.ttl == 0) - writer.addColumn(mut.getColumn_or_supercolumn().column.name, mut.getColumn_or_supercolumn().column.value, mut.getColumn_or_supercolumn().column.timestamp); + ssWriter.addColumn(mut.getColumn_or_supercolumn().column.name, mut.getColumn_or_supercolumn().column.value, mut.getColumn_or_supercolumn().column.timestamp); else - writer.addExpiringColumn(mut.getColumn_or_supercolumn().column.name, mut.getColumn_or_supercolumn().column.value, mut.getColumn_or_supercolumn().column.timestamp, mut.getColumn_or_supercolumn().column.ttl, System.currentTimeMillis() + ((long)(mut.getColumn_or_supercolumn().column.ttl) * 1000)); + ssWriter.addExpiringColumn(mut.getColumn_or_supercolumn().column.name, mut.getColumn_or_supercolumn().column.value, mut.getColumn_or_supercolumn().column.timestamp, mut.getColumn_or_supercolumn().column.ttl, System.currentTimeMillis() + ((long)(mut.getColumn_or_supercolumn().column.ttl) * 1000)); } } if (null != progress) @@ -211,158 +158,4 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>> HadoopCompat.progress(context); } } - @Override - public void close(TaskAttemptContext context) throws IOException, InterruptedException - { - close(); - } - - /** Fills the deprecated RecordWriter interface for streaming. */ - @Deprecated - public void close(org.apache.hadoop.mapred.Reporter reporter) throws IOException - { - close(); - } - - private void close() throws IOException - { - if (writer != null) - { - writer.close(); - Future<StreamState> future = loader.stream(); - while (true) - { - try - { - future.get(1000, TimeUnit.MILLISECONDS); - break; - } - catch (ExecutionException | TimeoutException te) - { - if (null != progress) - progress.progress(); - if (null != context) - HadoopCompat.progress(context); - } - catch (InterruptedException e) - { - throw new IOException(e); - } - } - if (loader.getFailedHosts().size() > 0) - { - if (loader.getFailedHosts().size() > maxFailures) - throw new IOException("Too many hosts failed: " + loader.getFailedHosts()); - else - logger.warn("Some hosts failed: {}", loader.getFailedHosts()); - } - } - } - - static class ExternalClient extends SSTableLoader.Client - { - private final Map<String, Map<String, CFMetaData>> knownCfs = new HashMap<>(); - private final String hostlist; - private final int rpcPort; - private final String username; - private final String password; - - public ExternalClient(String hostlist, int port, String username, String password) - { - super(); - this.hostlist = hostlist; - this.rpcPort = port; - this.username = username; - this.password = password; - } - - public void init(String keyspace) - { - Set<InetAddress> hosts = new HashSet<InetAddress>(); - String[] nodes = hostlist.split(","); - for (String node : nodes) - { - try - { - hosts.add(InetAddress.getByName(node)); - } - catch (UnknownHostException e) - { - throw new RuntimeException(e); - } - } - Iterator<InetAddress> hostiter = hosts.iterator(); - while (hostiter.hasNext()) - { - try - { - InetAddress host = hostiter.next(); - Cassandra.Client client = createThriftClient(host.getHostAddress(), rpcPort); - - // log in - client.set_keyspace(keyspace); - if (username != null) - { - Map<String, String> creds = new HashMap<String, String>(); - creds.put(IAuthenticator.USERNAME_KEY, username); - creds.put(IAuthenticator.PASSWORD_KEY, password); - AuthenticationRequest authRequest = new AuthenticationRequest(creds); - client.login(authRequest); - } - - List<TokenRange> tokenRanges = client.describe_ring(keyspace); - List<KsDef> ksDefs = client.describe_keyspaces(); - - setPartitioner(client.describe_partitioner()); - Token.TokenFactory tkFactory = getPartitioner().getTokenFactory(); - - for (TokenRange tr : tokenRanges) - { - Range<Token> range = new Range<Token>(tkFactory.fromString(tr.start_token), tkFactory.fromString(tr.end_token)); - for (String ep : tr.endpoints) - { - addRangeForEndpoint(range, InetAddress.getByName(ep)); - } - } - - for (KsDef ksDef : ksDefs) - { - Map<String, CFMetaData> cfs = new HashMap<>(ksDef.cf_defs.size()); - for (CfDef cfDef : ksDef.cf_defs) - cfs.put(cfDef.name, CFMetaData.fromThrift(cfDef)); - knownCfs.put(ksDef.name, cfs); - } - break; - } - catch (Exception e) - { - if (!hostiter.hasNext()) - throw new RuntimeException("Could not retrieve endpoint ranges: ", e); - } - } - } - - public CFMetaData getCFMetaData(String keyspace, String cfName) - { - Map<String, CFMetaData> cfs = knownCfs.get(keyspace); - return cfs != null ? cfs.get(cfName) : null; - } - - private static Cassandra.Client createThriftClient(String host, int port) throws TTransportException - { - TSocket socket = new TSocket(host, port); - TTransport trans = new TFramedTransport(socket); - trans.open(); - TProtocol protocol = new org.apache.thrift.protocol.TBinaryProtocol(trans); - return new Cassandra.Client(protocol); - } - } - - static class NullOutputHandler implements OutputHandler - { - public void output(String msg) {} - public void debug(String msg) {} - public void warn(String msg) {} - public void warn(String msg, Throwable th) {} - } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/88ad4f45/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java index b2c8fbf..e894996 100644 --- a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java +++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java @@ -89,7 +89,7 @@ public class CqlConfigHelper private static final String INPUT_NATIVE_SSL_CIPHER_SUITES = "cassandra.input.native.ssl.cipher.suites"; private static final String OUTPUT_CQL = "cassandra.output.cql"; - + /** * Set the CQL columns for the input of this job. * http://git-wip-us.apache.org/repos/asf/cassandra/blob/88ad4f45/src/java/org/apache/cassandra/hadoop/cql3/CqlOutputFormat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlOutputFormat.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlOutputFormat.java index f8613ba..0d09ca2 100644 --- a/src/java/org/apache/cassandra/hadoop/cql3/CqlOutputFormat.java +++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlOutputFormat.java @@ -30,7 +30,7 @@ import org.apache.hadoop.mapreduce.*; /** * The <code>ColumnFamilyOutputFormat</code> acts as a Hadoop-specific * OutputFormat that allows reduce tasks to store keys (and corresponding - * binded variable values) as CQL rows (and respective columns) in a given + * bound variable values) as CQL rows (and respective columns) in a given * ColumnFamily. * * <p> http://git-wip-us.apache.org/repos/asf/cassandra/blob/88ad4f45/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java index 1b407c5..ae8300c 100644 --- a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java @@ -17,6 +17,7 @@ */ package org.apache.cassandra.io.sstable; +import java.io.Closeable; import java.io.File; import java.io.FilenameFilter; import java.io.IOException; @@ -34,7 +35,7 @@ import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.utils.CounterId; import org.apache.cassandra.utils.Pair; -public abstract class AbstractSSTableSimpleWriter +public abstract class AbstractSSTableSimpleWriter implements Closeable { protected final File directory; protected final CFMetaData metadata; @@ -162,13 +163,6 @@ public abstract class AbstractSSTableSimpleWriter } /** - * Close this writer. - * This method should be called, otherwise the produced sstables are not - * guaranteed to be complete (and won't be in practice). - */ - public abstract void close() throws IOException; - - /** * Package protected for use by AbstractCQLSSTableWriter. * Not meant to be exposed publicly. */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/88ad4f45/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java index 6993b19..427d2d4 100644 --- a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java @@ -17,6 +17,7 @@ */ package org.apache.cassandra.io.sstable; +import java.io.Closeable; import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; @@ -71,7 +72,7 @@ import org.apache.cassandra.utils.Pair; * writer.close(); * </pre> */ -public class CQLSSTableWriter +public class CQLSSTableWriter implements Closeable { private final AbstractSSTableSimpleWriter writer; private final UpdateStatement insert;
