http://git-wip-us.apache.org/repos/asf/cassandra/blob/446e2537/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java deleted file mode 100644 index 92e3829..0000000 --- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java +++ /dev/null @@ -1,183 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.hadoop; - - -import java.io.*; -import java.nio.ByteBuffer; -import java.util.*; - -import org.slf4j.*; - -import org.apache.cassandra.auth.*; -import org.apache.cassandra.thrift.*; -import org.apache.hadoop.conf.*; -import org.apache.hadoop.mapreduce.*; -import org.apache.thrift.protocol.*; -import org.apache.thrift.transport.*; - -/** - * The <code>ColumnFamilyOutputFormat</code> acts as a Hadoop-specific - * OutputFormat that allows reduce tasks to store keys (and corresponding - * values) as Cassandra rows (and respective columns) in a given - * ColumnFamily. - * - * <p> - * As is the case with the {@link ColumnFamilyInputFormat}, you need to set the - * Keyspace and ColumnFamily in your - * Hadoop job Configuration. The {@link ConfigHelper} class, through its - * {@link ConfigHelper#setOutputColumnFamily} method, is provided to make this - * simple. - * </p> - * - * <p> - * For the sake of performance, this class employs a lazy write-back caching - * mechanism, where its record writer batches mutations created based on the - * reduce's inputs (in a task-specific map), and periodically makes the changes - * official by sending a batch mutate request to Cassandra. - * </p> - */ -@Deprecated -public class ColumnFamilyOutputFormat extends OutputFormat<ByteBuffer,List<Mutation>> - implements org.apache.hadoop.mapred.OutputFormat<ByteBuffer,List<Mutation>> -{ - public static final String BATCH_THRESHOLD = "mapreduce.output.columnfamilyoutputformat.batch.threshold"; - public static final String QUEUE_SIZE = "mapreduce.output.columnfamilyoutputformat.queue.size"; - - private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyOutputFormat.class); - - /** - * The OutputCommitter for this format does not write any data to the DFS. - * - * @param context - * the task context - * @return an output committer - * @throws IOException - * @throws InterruptedException - */ - public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException - { - return new NullOutputCommitter(); - } - - /** - * Check for validity of the output-specification for the job. - * - * @param context - * information about the job - */ - public void checkOutputSpecs(JobContext context) - { - checkOutputSpecs(HadoopCompat.getConfiguration(context)); - } - - protected void checkOutputSpecs(Configuration conf) - { - if (ConfigHelper.getOutputKeyspace(conf) == null) - throw new UnsupportedOperationException("You must set the keyspace with setOutputKeyspace()"); - if (ConfigHelper.getOutputPartitioner(conf) == null) - throw new UnsupportedOperationException("You must set the output partitioner to the one used by your Cassandra cluster"); - if (ConfigHelper.getOutputInitialAddress(conf) == null) - throw new UnsupportedOperationException("You must set the initial output address to a Cassandra node"); - } - - /** Fills the deprecated OutputFormat interface for streaming. */ - @Deprecated - public void checkOutputSpecs(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf job) throws IOException - { - checkOutputSpecs(job); - } - - /** - * Connects to the given server:port and returns a client based on the given socket that points to the configured - * keyspace, and is logged in with the configured credentials. - * - * @param host fully qualified host name to connect to - * @param port RPC port of the server - * @param conf a job configuration - * @return a cassandra client - * @throws Exception set of thrown exceptions may be implementation defined, - * depending on the used transport factory - */ - @SuppressWarnings("resource") - public static Cassandra.Client createAuthenticatedClient(String host, int port, Configuration conf) throws Exception - { - logger.debug("Creating authenticated client for CF output format"); - TTransport transport = ConfigHelper.getClientTransportFactory(conf).openTransport(host, port); - TProtocol binaryProtocol = new TBinaryProtocol(transport, true, true); - Cassandra.Client client = new Cassandra.Client(binaryProtocol); - client.set_keyspace(ConfigHelper.getOutputKeyspace(conf)); - String user = ConfigHelper.getOutputKeyspaceUserName(conf); - String password = ConfigHelper.getOutputKeyspacePassword(conf); - if ((user != null) && (password != null)) - login(user, password, client); - - logger.debug("Authenticated client for CF output format created successfully"); - return client; - } - - public static void login(String user, String password, Cassandra.Client client) throws Exception - { - Map<String, String> creds = new HashMap<String, String>(); - creds.put(PasswordAuthenticator.USERNAME_KEY, user); - creds.put(PasswordAuthenticator.PASSWORD_KEY, password); - AuthenticationRequest authRequest = new AuthenticationRequest(creds); - client.login(authRequest); - } - - /** Fills the deprecated OutputFormat interface for streaming. */ - @Deprecated - public ColumnFamilyRecordWriter getRecordWriter(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf job, String name, org.apache.hadoop.util.Progressable progress) - { - return new ColumnFamilyRecordWriter(job, progress); - } - - /** - * Get the {@link RecordWriter} for the given task. - * - * @param context - * the information about the current task. - * @return a {@link RecordWriter} to write the output for the job. - */ - public ColumnFamilyRecordWriter getRecordWriter(final TaskAttemptContext context) throws InterruptedException - { - return new ColumnFamilyRecordWriter(context); - } - - /** - * An {@link OutputCommitter} that does nothing. - */ - private static class NullOutputCommitter extends OutputCommitter - { - public void abortTask(TaskAttemptContext taskContext) { } - - public void cleanupJob(JobContext jobContext) { } - - public void commitTask(TaskAttemptContext taskContext) { } - - public boolean needsTaskCommit(TaskAttemptContext taskContext) - { - return false; - } - - public void setupJob(JobContext jobContext) { } - - public void setupTask(TaskAttemptContext taskContext) { } - } - -}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/446e2537/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java deleted file mode 100644 index aee730d..0000000 --- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java +++ /dev/null @@ -1,615 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.hadoop; - -import java.io.IOException; -import java.net.InetAddress; -import java.net.UnknownHostException; -import java.nio.ByteBuffer; -import java.util.*; - -import com.google.common.collect.*; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.cassandra.schema.LegacySchemaTables; -import org.apache.cassandra.db.SystemKeyspace; -import org.apache.cassandra.db.marshal.AbstractType; -import org.apache.cassandra.db.marshal.CompositeType; -import org.apache.cassandra.db.marshal.TypeParser; -import org.apache.cassandra.dht.IPartitioner; -import org.apache.cassandra.exceptions.ConfigurationException; -import org.apache.cassandra.thrift.*; -import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.Pair; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.thrift.TException; -import org.apache.thrift.transport.TTransport; - -@Deprecated -public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap<ByteBuffer, ColumnFamilyRecordReader.Column>> - implements org.apache.hadoop.mapred.RecordReader<ByteBuffer, SortedMap<ByteBuffer, ColumnFamilyRecordReader.Column>> -{ - private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyRecordReader.class); - - public static final int CASSANDRA_HADOOP_MAX_KEY_SIZE_DEFAULT = 8192; - - private ColumnFamilySplit split; - private RowIterator iter; - private Pair<ByteBuffer, SortedMap<ByteBuffer, Column>> currentRow; - private SlicePredicate predicate; - private boolean isEmptyPredicate; - private int totalRowCount; // total number of rows to fetch - private int batchSize; // fetch this many per batch - private String keyspace; - private String cfName; - private Cassandra.Client client; - private ConsistencyLevel consistencyLevel; - private int keyBufferSize = 8192; - private List<IndexExpression> filter; - - - public ColumnFamilyRecordReader() - { - this(ColumnFamilyRecordReader.CASSANDRA_HADOOP_MAX_KEY_SIZE_DEFAULT); - } - - public ColumnFamilyRecordReader(int keyBufferSize) - { - super(); - this.keyBufferSize = keyBufferSize; - } - - @SuppressWarnings("resource") - public void close() - { - if (client != null) - { - TTransport transport = client.getOutputProtocol().getTransport(); - if (transport.isOpen()) - transport.close(); - } - } - - public ByteBuffer getCurrentKey() - { - return currentRow.left; - } - - public SortedMap<ByteBuffer, Column> getCurrentValue() - { - return currentRow.right; - } - - public float getProgress() - { - if (!iter.hasNext()) - return 1.0F; - - // the progress is likely to be reported slightly off the actual but close enough - float progress = ((float) iter.rowsRead() / totalRowCount); - return progress > 1.0F ? 1.0F : progress; - } - - static boolean isEmptyPredicate(SlicePredicate predicate) - { - if (predicate == null) - return true; - - if (predicate.isSetColumn_names() && predicate.getSlice_range() == null) - return false; - - if (predicate.getSlice_range() == null) - return true; - - byte[] start = predicate.getSlice_range().getStart(); - if ((start != null) && (start.length > 0)) - return false; - - byte[] finish = predicate.getSlice_range().getFinish(); - if ((finish != null) && (finish.length > 0)) - return false; - - return true; - } - - public void initialize(InputSplit split, TaskAttemptContext context) throws IOException - { - this.split = (ColumnFamilySplit) split; - Configuration conf = HadoopCompat.getConfiguration(context); - KeyRange jobRange = ConfigHelper.getInputKeyRange(conf); - filter = jobRange == null ? null : jobRange.row_filter; - predicate = ConfigHelper.getInputSlicePredicate(conf); - boolean widerows = ConfigHelper.getInputIsWide(conf); - isEmptyPredicate = isEmptyPredicate(predicate); - totalRowCount = (this.split.getLength() < Long.MAX_VALUE) - ? (int) this.split.getLength() - : ConfigHelper.getInputSplitSize(conf); - batchSize = ConfigHelper.getRangeBatchSize(conf); - cfName = ConfigHelper.getInputColumnFamily(conf); - consistencyLevel = ConsistencyLevel.valueOf(ConfigHelper.getReadConsistencyLevel(conf)); - keyspace = ConfigHelper.getInputKeyspace(conf); - - if (batchSize < 2) - throw new IllegalArgumentException("Minimum batchSize is 2. Suggested batchSize is 100 or more"); - - try - { - if (client != null) - return; - - // create connection using thrift - String location = getLocation(); - - int port = ConfigHelper.getInputRpcPort(conf); - client = ColumnFamilyInputFormat.createAuthenticatedClient(location, port, conf); - - } - catch (Exception e) - { - throw new RuntimeException(e); - } - - iter = widerows ? new WideRowIterator() : new StaticRowIterator(); - logger.debug("created {}", iter); - } - - public boolean nextKeyValue() throws IOException - { - if (!iter.hasNext()) - { - logger.debug("Finished scanning {} rows (estimate was: {})", iter.rowsRead(), totalRowCount); - return false; - } - - currentRow = iter.next(); - return true; - } - - // we don't use endpointsnitch since we are trying to support hadoop nodes that are - // not necessarily on Cassandra machines, too. This should be adequate for single-DC clusters, at least. - private String getLocation() - { - Collection<InetAddress> localAddresses = FBUtilities.getAllLocalAddresses(); - - for (InetAddress address : localAddresses) - { - for (String location : split.getLocations()) - { - InetAddress locationAddress = null; - try - { - locationAddress = InetAddress.getByName(location); - } - catch (UnknownHostException e) - { - throw new AssertionError(e); - } - if (address.equals(locationAddress)) - { - return location; - } - } - } - return split.getLocations()[0]; - } - - private abstract class RowIterator extends AbstractIterator<Pair<ByteBuffer, SortedMap<ByteBuffer, Column>>> - { - protected List<KeySlice> rows; - protected int totalRead = 0; - protected final boolean isSuper; - protected final AbstractType<?> comparator; - protected final AbstractType<?> subComparator; - protected final IPartitioner partitioner; - - private RowIterator() - { - CfDef cfDef = new CfDef(); - try - { - partitioner = FBUtilities.newPartitioner(client.describe_partitioner()); - // get CF meta data - String query = String.format("SELECT comparator, subcomparator, type " + - "FROM %s.%s " + - "WHERE keyspace_name = '%s' AND columnfamily_name = '%s'", - SystemKeyspace.NAME, - LegacySchemaTables.COLUMNFAMILIES, - keyspace, - cfName); - - CqlResult result = client.execute_cql3_query(ByteBufferUtil.bytes(query), Compression.NONE, ConsistencyLevel.ONE); - - Iterator<CqlRow> iteraRow = result.rows.iterator(); - - if (iteraRow.hasNext()) - { - CqlRow cqlRow = iteraRow.next(); - cfDef.comparator_type = ByteBufferUtil.string(cqlRow.columns.get(0).value); - ByteBuffer subComparator = cqlRow.columns.get(1).value; - if (subComparator != null) - cfDef.subcomparator_type = ByteBufferUtil.string(subComparator); - - ByteBuffer type = cqlRow.columns.get(2).value; - if (type != null) - cfDef.column_type = ByteBufferUtil.string(type); - } - - comparator = TypeParser.parse(cfDef.comparator_type); - subComparator = cfDef.subcomparator_type == null ? null : TypeParser.parse(cfDef.subcomparator_type); - } - catch (ConfigurationException e) - { - throw new RuntimeException("unable to load sub/comparator", e); - } - catch (TException e) - { - throw new RuntimeException("error communicating via Thrift", e); - } - catch (Exception e) - { - throw new RuntimeException("unable to load keyspace " + keyspace, e); - } - isSuper = "Super".equalsIgnoreCase(cfDef.column_type); - } - - /** - * @return total number of rows read by this record reader - */ - public int rowsRead() - { - return totalRead; - } - - protected List<Pair<ByteBuffer, Column>> unthriftify(ColumnOrSuperColumn cosc) - { - if (cosc.counter_column != null) - return Collections.singletonList(unthriftifyCounter(cosc.counter_column)); - if (cosc.counter_super_column != null) - return unthriftifySuperCounter(cosc.counter_super_column); - if (cosc.super_column != null) - return unthriftifySuper(cosc.super_column); - assert cosc.column != null; - return Collections.singletonList(unthriftifySimple(cosc.column)); - } - - private List<Pair<ByteBuffer, Column>> unthriftifySuper(SuperColumn super_column) - { - List<Pair<ByteBuffer, Column>> columns = new ArrayList<>(super_column.columns.size()); - for (org.apache.cassandra.thrift.Column column : super_column.columns) - { - Pair<ByteBuffer, Column> c = unthriftifySimple(column); - columns.add(Pair.create(CompositeType.build(super_column.name, c.left), c.right)); - } - return columns; - } - - protected Pair<ByteBuffer, Column> unthriftifySimple(org.apache.cassandra.thrift.Column column) - { - return Pair.create(column.name, Column.fromRegularColumn(column)); - } - - private Pair<ByteBuffer, Column> unthriftifyCounter(CounterColumn column) - { - return Pair.create(column.name, Column.fromCounterColumn(column)); - } - - private List<Pair<ByteBuffer, Column>> unthriftifySuperCounter(CounterSuperColumn super_column) - { - List<Pair<ByteBuffer, Column>> columns = new ArrayList<>(super_column.columns.size()); - for (CounterColumn column : super_column.columns) - { - Pair<ByteBuffer, Column> c = unthriftifyCounter(column); - columns.add(Pair.create(CompositeType.build(super_column.name, c.left), c.right)); - } - return columns; - } - } - - private class StaticRowIterator extends RowIterator - { - protected int i = 0; - - private void maybeInit() - { - // check if we need another batch - if (rows != null && i < rows.size()) - return; - - String startToken; - if (totalRead == 0) - { - // first request - startToken = split.getStartToken(); - } - else - { - startToken = partitioner.getTokenFactory().toString(partitioner.getToken(Iterables.getLast(rows).key)); - if (startToken.equals(split.getEndToken())) - { - // reached end of the split - rows = null; - return; - } - } - - KeyRange keyRange = new KeyRange(batchSize) - .setStart_token(startToken) - .setEnd_token(split.getEndToken()) - .setRow_filter(filter); - try - { - rows = client.get_range_slices(new ColumnParent(cfName), predicate, keyRange, consistencyLevel); - - // nothing new? reached the end - if (rows.isEmpty()) - { - rows = null; - return; - } - - // remove ghosts when fetching all columns - if (isEmptyPredicate) - { - Iterator<KeySlice> it = rows.iterator(); - KeySlice ks; - do - { - ks = it.next(); - if (ks.getColumnsSize() == 0) - { - it.remove(); - } - } while (it.hasNext()); - - // all ghosts, spooky - if (rows.isEmpty()) - { - // maybeInit assumes it can get the start-with key from the rows collection, so add back the last - rows.add(ks); - maybeInit(); - return; - } - } - - // reset to iterate through this new batch - i = 0; - } - catch (Exception e) - { - throw new RuntimeException(e); - } - } - - protected Pair<ByteBuffer, SortedMap<ByteBuffer, Column>> computeNext() - { - maybeInit(); - if (rows == null) - return endOfData(); - - totalRead++; - KeySlice ks = rows.get(i++); - AbstractType<?> comp = isSuper ? CompositeType.getInstance(comparator, subComparator) : comparator; - SortedMap<ByteBuffer, Column> map = new TreeMap<>(comp); - for (ColumnOrSuperColumn cosc : ks.columns) - { - List<Pair<ByteBuffer, Column>> columns = unthriftify(cosc); - for (Pair<ByteBuffer, Column> column : columns) - map.put(column.left, column.right); - } - return Pair.create(ks.key, map); - } - } - - private class WideRowIterator extends RowIterator - { - private PeekingIterator<Pair<ByteBuffer, SortedMap<ByteBuffer, Column>>> wideColumns; - private ByteBuffer lastColumn = ByteBufferUtil.EMPTY_BYTE_BUFFER; - private ByteBuffer lastCountedKey = ByteBufferUtil.EMPTY_BYTE_BUFFER; - - private void maybeInit() - { - if (wideColumns != null && wideColumns.hasNext()) - return; - - KeyRange keyRange; - if (totalRead == 0) - { - String startToken = split.getStartToken(); - keyRange = new KeyRange(batchSize) - .setStart_token(startToken) - .setEnd_token(split.getEndToken()) - .setRow_filter(filter); - } - else - { - KeySlice lastRow = Iterables.getLast(rows); - logger.debug("Starting with last-seen row {}", lastRow.key); - keyRange = new KeyRange(batchSize) - .setStart_key(lastRow.key) - .setEnd_token(split.getEndToken()) - .setRow_filter(filter); - } - - try - { - rows = client.get_paged_slice(cfName, keyRange, lastColumn, consistencyLevel); - int n = 0; - for (KeySlice row : rows) - n += row.columns.size(); - logger.debug("read {} columns in {} rows for {} starting with {}", - new Object[]{ n, rows.size(), keyRange, lastColumn }); - - wideColumns = Iterators.peekingIterator(new WideColumnIterator(rows)); - if (wideColumns.hasNext() && wideColumns.peek().right.keySet().iterator().next().equals(lastColumn)) - wideColumns.next(); - if (!wideColumns.hasNext()) - rows = null; - } - catch (Exception e) - { - throw new RuntimeException(e); - } - } - - protected Pair<ByteBuffer, SortedMap<ByteBuffer, Column>> computeNext() - { - maybeInit(); - if (rows == null) - return endOfData(); - - Pair<ByteBuffer, SortedMap<ByteBuffer, Column>> next = wideColumns.next(); - lastColumn = next.right.keySet().iterator().next().duplicate(); - - maybeIncreaseRowCounter(next); - return next; - } - - - /** - * Increases the row counter only if we really moved to the next row. - * @param next just fetched row slice - */ - private void maybeIncreaseRowCounter(Pair<ByteBuffer, SortedMap<ByteBuffer, Column>> next) - { - ByteBuffer currentKey = next.left; - if (!currentKey.equals(lastCountedKey)) - { - totalRead++; - lastCountedKey = currentKey; - } - } - - private class WideColumnIterator extends AbstractIterator<Pair<ByteBuffer, SortedMap<ByteBuffer, Column>>> - { - private final Iterator<KeySlice> rows; - private Iterator<ColumnOrSuperColumn> columns; - public KeySlice currentRow; - - public WideColumnIterator(List<KeySlice> rows) - { - this.rows = rows.iterator(); - if (this.rows.hasNext()) - nextRow(); - else - columns = Iterators.emptyIterator(); - } - - private void nextRow() - { - currentRow = rows.next(); - columns = currentRow.columns.iterator(); - } - - protected Pair<ByteBuffer, SortedMap<ByteBuffer, Column>> computeNext() - { - AbstractType<?> comp = isSuper ? CompositeType.getInstance(comparator, subComparator) : comparator; - while (true) - { - if (columns.hasNext()) - { - ColumnOrSuperColumn cosc = columns.next(); - SortedMap<ByteBuffer, Column> map; - List<Pair<ByteBuffer, Column>> columns = unthriftify(cosc); - if (columns.size() == 1) - { - map = ImmutableSortedMap.of(columns.get(0).left, columns.get(0).right); - } - else - { - assert isSuper; - map = new TreeMap<>(comp); - for (Pair<ByteBuffer, Column> column : columns) - map.put(column.left, column.right); - } - return Pair.create(currentRow.key, map); - } - - if (!rows.hasNext()) - return endOfData(); - - nextRow(); - } - } - } - } - - // Because the old Hadoop API wants us to write to the key and value - // and the new asks for them, we need to copy the output of the new API - // to the old. Thus, expect a small performance hit. - // And obviously this wouldn't work for wide rows. But since ColumnFamilyInputFormat - // and ColumnFamilyRecordReader don't support them, it should be fine for now. - public boolean next(ByteBuffer key, SortedMap<ByteBuffer, Column> value) throws IOException - { - if (this.nextKeyValue()) - { - key.clear(); - key.put(this.getCurrentKey().duplicate()); - key.flip(); - - value.clear(); - value.putAll(this.getCurrentValue()); - - return true; - } - return false; - } - - public ByteBuffer createKey() - { - return ByteBuffer.wrap(new byte[this.keyBufferSize]); - } - - public SortedMap<ByteBuffer, Column> createValue() - { - return new TreeMap<>(); - } - - public long getPos() throws IOException - { - return iter.rowsRead(); - } - - public static final class Column - { - public final ByteBuffer name; - public final ByteBuffer value; - public final long timestamp; - - private Column(ByteBuffer name, ByteBuffer value, long timestamp) - { - this.name = name; - this.value = value; - this.timestamp = timestamp; - } - - static Column fromRegularColumn(org.apache.cassandra.thrift.Column input) - { - return new Column(input.name, input.value, input.timestamp); - } - - static Column fromCounterColumn(org.apache.cassandra.thrift.CounterColumn input) - { - return new Column(input.name, ByteBufferUtil.bytes(input.value), 0); - } - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/446e2537/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java deleted file mode 100644 index 9547d0e..0000000 --- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java +++ /dev/null @@ -1,341 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.hadoop; - - -import java.io.IOException; -import java.net.InetAddress; -import java.nio.ByteBuffer; -import java.util.*; -import java.util.concurrent.*; - -import org.apache.cassandra.client.*; -import org.apache.cassandra.dht.Range; -import org.apache.cassandra.dht.Token; -import org.apache.cassandra.thrift.*; -import org.apache.cassandra.thrift.ConsistencyLevel; -import org.apache.cassandra.utils.*; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapreduce.*; -import org.apache.thrift.TException; -import org.apache.hadoop.util.Progressable; -import org.apache.thrift.transport.*; - - -/** - * The <code>ColumnFamilyRecordWriter</code> maps the output <key, value> - * pairs to a Cassandra column family. In particular, it applies all mutations - * in the value, which it associates with the key, and in turn the responsible - * endpoint. - * - * <p> - * Furthermore, this writer groups the mutations by the endpoint responsible for - * the rows being affected. This allows the mutations to be executed in parallel, - * directly to a responsible endpoint. - * </p> - * - * @see ColumnFamilyOutputFormat - */ -@Deprecated -final class ColumnFamilyRecordWriter extends RecordWriter<ByteBuffer, List<Mutation>> implements - org.apache.hadoop.mapred.RecordWriter<ByteBuffer, List<Mutation>> -{ - // The configuration this writer is associated with. - protected final Configuration conf; - - // The number of mutations to buffer per endpoint - protected final int queueSize; - - protected final long batchThreshold; - - protected final ConsistencyLevel consistencyLevel; - protected Progressable progressable; - protected TaskAttemptContext context; - // handles for clients for each range running in the threadpool - private final Map<Range, RangeClient> clients; - - // The ring cache that describes the token ranges each node in the ring is - // responsible for. This is what allows us to group the mutations by - // the endpoints they should be targeted at. The targeted endpoint - // essentially - // acts as the primary replica for the rows being affected by the mutations. - private final RingCache ringCache; - - /** - * Upon construction, obtain the map that this writer will use to collect - * mutations, and the ring cache for the given keyspace. - * - * @param context the task attempt context - * @throws IOException - */ - ColumnFamilyRecordWriter(TaskAttemptContext context) - { - this(HadoopCompat.getConfiguration(context)); - this.context = context; - - } - ColumnFamilyRecordWriter(Configuration conf, Progressable progressable) - { - this(conf); - this.progressable = progressable; - } - - ColumnFamilyRecordWriter(Configuration conf) - { - this.conf = conf; - this.queueSize = conf.getInt(ColumnFamilyOutputFormat.QUEUE_SIZE, 32 * FBUtilities.getAvailableProcessors()); - batchThreshold = conf.getLong(ColumnFamilyOutputFormat.BATCH_THRESHOLD, 32); - consistencyLevel = ConsistencyLevel.valueOf(ConfigHelper.getWriteConsistencyLevel(conf)); - this.ringCache = new RingCache(conf); - this.clients = new HashMap<Range, RangeClient>(); - } - - /** - * Close this <code>RecordWriter</code> to future operations, but not before - * flushing out the batched mutations. - * - * @param context the context of the task - * @throws IOException - */ - public void close(TaskAttemptContext context) throws IOException - { - close(); - } - - /** Fills the deprecated RecordWriter interface for streaming. */ - @Deprecated - public void close(org.apache.hadoop.mapred.Reporter reporter) throws IOException - { - close(); - } - - public void close() throws IOException - { - // close all the clients before throwing anything - IOException clientException = null; - for (RangeClient client : clients.values()) - { - try - { - client.close(); - } - catch (IOException e) - { - clientException = e; - } - } - if (clientException != null) - throw clientException; - } - - /** - * If the key is to be associated with a valid value, a mutation is created - * for it with the given column family and columns. In the event the value - * in the column is missing (i.e., null), then it is marked for - * {@link Deletion}. Similarly, if the entire value for a key is missing - * (i.e., null), then the entire key is marked for {@link Deletion}. - * </p> - * - * @param keybuff - * the key to write. - * @param value - * the value to write. - * @throws IOException - */ - @Override - public void write(ByteBuffer keybuff, List<Mutation> value) throws IOException - { - Range<Token> range = ringCache.getRange(keybuff); - - // get the client for the given range, or create a new one - RangeClient client = clients.get(range); - if (client == null) - { - // haven't seen keys for this range: create new client - client = new RangeClient(ringCache.getEndpoint(range)); - client.start(); - clients.put(range, client); - } - - for (Mutation amut : value) - client.put(Pair.create(keybuff, amut)); - if (progressable != null) - progressable.progress(); - if (context != null) - HadoopCompat.progress(context); - } - - /** - * A client that runs in a threadpool and connects to the list of endpoints for a particular - * range. Mutations for keys in that range are sent to this client via a queue. - */ - public class RangeClient extends Thread - { - // The list of endpoints for this range - protected final List<InetAddress> endpoints; - // A bounded queue of incoming mutations for this range - protected final BlockingQueue<Pair<ByteBuffer, Mutation>> queue = new ArrayBlockingQueue<>(queueSize); - - protected volatile boolean run = true; - // we want the caller to know if something went wrong, so we record any unrecoverable exception while writing - // so we can throw it on the caller's stack when he calls put() again, or if there are no more put calls, - // when the client is closed. - protected volatile IOException lastException; - - protected Cassandra.Client client; - public final String columnFamily = ConfigHelper.getOutputColumnFamily(conf); - - /** - * Constructs an {@link RangeClient} for the given endpoints. - * @param endpoints the possible endpoints to execute the mutations on - */ - public RangeClient(List<InetAddress> endpoints) - { - super("client-" + endpoints); - this.endpoints = endpoints; - } - - /** - * enqueues the given value to Cassandra - */ - public void put(Pair<ByteBuffer, Mutation> value) throws IOException - { - while (true) - { - if (lastException != null) - throw lastException; - try - { - if (queue.offer(value, 100, TimeUnit.MILLISECONDS)) - break; - } - catch (InterruptedException e) - { - throw new AssertionError(e); - } - } - } - - public void close() throws IOException - { - // stop the run loop. this will result in closeInternal being called by the time join() finishes. - run = false; - interrupt(); - try - { - this.join(); - } - catch (InterruptedException e) - { - throw new AssertionError(e); - } - - if (lastException != null) - throw lastException; - } - - @SuppressWarnings("resource") - protected void closeInternal() - { - if (client != null) - { - TTransport transport = client.getOutputProtocol().getTransport(); - if (transport.isOpen()) - transport.close(); - } - } - - /** - * Loops collecting mutations from the queue and sending to Cassandra - */ - public void run() - { - outer: - while (run || !queue.isEmpty()) - { - Pair<ByteBuffer, Mutation> mutation; - try - { - mutation = queue.take(); - } - catch (InterruptedException e) - { - // re-check loop condition after interrupt - continue; - } - - Map<ByteBuffer, Map<String, List<Mutation>>> batch = new HashMap<ByteBuffer, Map<String, List<Mutation>>>(); - while (mutation != null) - { - Map<String, List<Mutation>> subBatch = batch.get(mutation.left); - if (subBatch == null) - { - subBatch = Collections.singletonMap(columnFamily, (List<Mutation>) new ArrayList<Mutation>()); - batch.put(mutation.left, subBatch); - } - - subBatch.get(columnFamily).add(mutation.right); - if (batch.size() >= batchThreshold) - break; - - mutation = queue.poll(); - } - - Iterator<InetAddress> iter = endpoints.iterator(); - while (true) - { - // send the mutation to the last-used endpoint. first time through, this will NPE harmlessly. - try - { - client.batch_mutate(batch, consistencyLevel); - break; - } - catch (Exception e) - { - closeInternal(); - if (!iter.hasNext()) - { - lastException = new IOException(e); - break outer; - } - } - - // attempt to connect to a different endpoint - try - { - InetAddress address = iter.next(); - String host = address.getHostName(); - int port = ConfigHelper.getOutputRpcPort(conf); - client = ColumnFamilyOutputFormat.createAuthenticatedClient(host, port, conf); - } - catch (Exception e) - { - closeInternal(); - // TException means something unexpected went wrong to that endpoint, so - // we should try again to another. Other exceptions (auth or invalid request) are fatal. - if ((!(e instanceof TException)) || !iter.hasNext()) - { - lastException = new IOException(e); - break outer; - } - } - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/446e2537/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java index e77c4c8..3e69c2d 100644 --- a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java +++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java @@ -33,7 +33,6 @@ import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.Config; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.exceptions.InvalidRequestException; -import org.apache.cassandra.hadoop.BulkRecordWriter; import org.apache.cassandra.hadoop.ConfigHelper; import org.apache.cassandra.hadoop.HadoopCompat; import org.apache.cassandra.io.sstable.CQLSSTableWriter; @@ -41,6 +40,7 @@ import org.apache.cassandra.io.sstable.SSTableLoader; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.streaming.StreamState; import org.apache.cassandra.utils.NativeSSTableLoaderClient; +import org.apache.cassandra.utils.OutputHandler; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; @@ -152,7 +152,7 @@ public class CqlBulkRecordWriter extends RecordWriter<Object, List<ByteBuffer>> ExternalClient externalClient = new ExternalClient(conf); externalClient.setTableMetadata(CFMetaData.compile(schema, keyspace)); - loader = new SSTableLoader(outputDir, externalClient, new BulkRecordWriter.NullOutputHandler()) + loader = new SSTableLoader(outputDir, externalClient, new NullOutputHandler()) { @Override public void onSuccess(StreamState finalState) @@ -287,4 +287,12 @@ public class CqlBulkRecordWriter extends RecordWriter<Object, List<ByteBuffer>> return addresses; } } + + public static class NullOutputHandler implements OutputHandler + { + public void output(String msg) {} + public void debug(String msg) {} + public void warn(String msg) {} + public void warn(String msg, Throwable th) {} + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/446e2537/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java index 09bd80c..70429a8 100644 --- a/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java +++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java @@ -18,12 +18,44 @@ package org.apache.cassandra.hadoop.cql3; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; -import org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datastax.driver.core.Host; +import com.datastax.driver.core.Metadata; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.TokenRange; +import org.apache.cassandra.db.SystemKeyspace; +import org.apache.cassandra.dht.ByteOrderedPartitioner; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.OrderPreservingPartitioner; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.hadoop.ColumnFamilySplit; +import org.apache.cassandra.hadoop.ConfigHelper; +import org.apache.cassandra.hadoop.HadoopCompat; +import org.apache.cassandra.thrift.KeyRange; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptID; @@ -48,8 +80,15 @@ import com.datastax.driver.core.Row; * * other native protocol connection parameters in CqlConfigHelper */ -public class CqlInputFormat extends AbstractColumnFamilyInputFormat<Long, Row> +public class CqlInputFormat extends org.apache.hadoop.mapreduce.InputFormat<Long, Row> implements org.apache.hadoop.mapred.InputFormat<Long, Row> { + public static final String MAPRED_TASK_ID = "mapred.task.id"; + private static final Logger logger = LoggerFactory.getLogger(CqlInputFormat.class); + private String keyspace; + private String cfName; + private IPartitioner partitioner; + private Session session; + public RecordReader<Long, Row> getRecordReader(InputSplit split, JobConf jobConf, final Reporter reporter) throws IOException { @@ -75,4 +114,238 @@ public class CqlInputFormat extends AbstractColumnFamilyInputFormat<Long, Row> return new CqlRecordReader(); } + protected void validateConfiguration(Configuration conf) + { + if (ConfigHelper.getInputKeyspace(conf) == null || ConfigHelper.getInputColumnFamily(conf) == null) + { + throw new UnsupportedOperationException("you must set the keyspace and table with setInputColumnFamily()"); + } + if (ConfigHelper.getInputInitialAddress(conf) == null) + throw new UnsupportedOperationException("You must set the initial output address to a Cassandra node with setInputInitialAddress"); + if (ConfigHelper.getInputPartitioner(conf) == null) + throw new UnsupportedOperationException("You must set the Cassandra partitioner class with setInputPartitioner"); + } + + public List<org.apache.hadoop.mapreduce.InputSplit> getSplits(JobContext context) throws IOException + { + Configuration conf = HadoopCompat.getConfiguration(context); + + validateConfiguration(conf); + + keyspace = ConfigHelper.getInputKeyspace(conf); + cfName = ConfigHelper.getInputColumnFamily(conf); + partitioner = ConfigHelper.getInputPartitioner(conf); + logger.debug("partitioner is {}", partitioner); + + // canonical ranges and nodes holding replicas + Map<TokenRange, Set<Host>> masterRangeNodes = getRangeMap(conf, keyspace); + + // canonical ranges, split into pieces, fetching the splits in parallel + ExecutorService executor = new ThreadPoolExecutor(0, 128, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()); + List<org.apache.hadoop.mapreduce.InputSplit> splits = new ArrayList<>(); + + try + { + List<Future<List<org.apache.hadoop.mapreduce.InputSplit>>> splitfutures = new ArrayList<>(); + KeyRange jobKeyRange = ConfigHelper.getInputKeyRange(conf); + Range<Token> jobRange = null; + if (jobKeyRange != null) + { + if (jobKeyRange.start_key != null) + { + if (!partitioner.preservesOrder()) + throw new UnsupportedOperationException("KeyRange based on keys can only be used with a order preserving partitioner"); + if (jobKeyRange.start_token != null) + throw new IllegalArgumentException("only start_key supported"); + if (jobKeyRange.end_token != null) + throw new IllegalArgumentException("only start_key supported"); + jobRange = new Range<>(partitioner.getToken(jobKeyRange.start_key), + partitioner.getToken(jobKeyRange.end_key)); + } + else if (jobKeyRange.start_token != null) + { + jobRange = new Range<>(partitioner.getTokenFactory().fromString(jobKeyRange.start_token), + partitioner.getTokenFactory().fromString(jobKeyRange.end_token)); + } + else + { + logger.warn("ignoring jobKeyRange specified without start_key or start_token"); + } + } + + session = CqlConfigHelper.getInputCluster(ConfigHelper.getInputInitialAddress(conf).split(","), conf).connect(); + Metadata metadata = session.getCluster().getMetadata(); + + for (TokenRange range : masterRangeNodes.keySet()) + { + if (jobRange == null) + { + // for each tokenRange, pick a live owner and ask it to compute bite-sized splits + splitfutures.add(executor.submit(new SplitCallable(range, masterRangeNodes.get(range), conf))); + } + else + { + TokenRange jobTokenRange = rangeToTokenRange(metadata, jobRange); + if (range.intersects(jobTokenRange)) + { + for (TokenRange intersection: range.intersectWith(jobTokenRange)) + { + // for each tokenRange, pick a live owner and ask it to compute bite-sized splits + splitfutures.add(executor.submit(new SplitCallable(intersection, masterRangeNodes.get(range), conf))); + } + } + } + } + + // wait until we have all the results back + for (Future<List<org.apache.hadoop.mapreduce.InputSplit>> futureInputSplits : splitfutures) + { + try + { + splits.addAll(futureInputSplits.get()); + } + catch (Exception e) + { + throw new IOException("Could not get input splits", e); + } + } + } + finally + { + executor.shutdownNow(); + } + + assert splits.size() > 0; + Collections.shuffle(splits, new Random(System.nanoTime())); + return splits; + } + + private TokenRange rangeToTokenRange(Metadata metadata, Range<Token> range) + { + return metadata.newTokenRange(metadata.newToken(partitioner.getTokenFactory().toString(range.left)), + metadata.newToken(partitioner.getTokenFactory().toString(range.right))); + } + + private Map<TokenRange, Long> getSubSplits(String keyspace, String cfName, TokenRange range, Configuration conf) throws IOException + { + int splitSize = ConfigHelper.getInputSplitSize(conf); + try + { + return describeSplits(keyspace, cfName, range, splitSize); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + } + + private Map<TokenRange, Set<Host>> getRangeMap(Configuration conf, String keyspace) + { + try (Session session = CqlConfigHelper.getInputCluster(ConfigHelper.getInputInitialAddress(conf).split(","), conf).connect()) + { + Map<TokenRange, Set<Host>> map = new HashMap<>(); + Metadata metadata = session.getCluster().getMetadata(); + for (TokenRange tokenRange : metadata.getTokenRanges()) + map.put(tokenRange, metadata.getReplicas('"' + keyspace + '"', tokenRange)); + return map; + } + } + + private Map<TokenRange, Long> describeSplits(String keyspace, String table, TokenRange tokenRange, int splitSize) + { + String query = String.format("SELECT mean_partition_size, partitions_count " + + "FROM %s.%s " + + "WHERE keyspace_name = ? AND table_name = ? AND range_start = ? AND range_end = ?", + SystemKeyspace.NAME, + SystemKeyspace.SIZE_ESTIMATES); + + ResultSet resultSet = session.execute(query, keyspace, table, tokenRange.getStart().toString(), tokenRange.getEnd().toString()); + + Row row = resultSet.one(); + // If we have no data on this split, return the full split i.e., do not sub-split + // Assume smallest granularity of partition count available from CASSANDRA-7688 + if (row == null) + { + Map<TokenRange, Long> wrappedTokenRange = new HashMap<>(); + wrappedTokenRange.put(tokenRange, (long) 128); + return wrappedTokenRange; + } + + long meanPartitionSize = row.getLong("mean_partition_size"); + long partitionCount = row.getLong("partitions_count"); + + int splitCount = (int)((meanPartitionSize * partitionCount) / splitSize); + List<TokenRange> splitRanges = tokenRange.splitEvenly(splitCount); + Map<TokenRange, Long> rangesWithLength = new HashMap<>(); + for (TokenRange range : splitRanges) + rangesWithLength.put(range, partitionCount/splitCount); + + return rangesWithLength; + } + + // Old Hadoop API + public InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException + { + TaskAttemptContext tac = HadoopCompat.newTaskAttemptContext(jobConf, new TaskAttemptID()); + List<org.apache.hadoop.mapreduce.InputSplit> newInputSplits = this.getSplits(tac); + InputSplit[] oldInputSplits = new InputSplit[newInputSplits.size()]; + for (int i = 0; i < newInputSplits.size(); i++) + oldInputSplits[i] = (ColumnFamilySplit)newInputSplits.get(i); + return oldInputSplits; + } + + /** + * Gets a token tokenRange and splits it up according to the suggested + * size into input splits that Hadoop can use. + */ + class SplitCallable implements Callable<List<org.apache.hadoop.mapreduce.InputSplit>> + { + + private final TokenRange tokenRange; + private final Set<Host> hosts; + private final Configuration conf; + + public SplitCallable(TokenRange tr, Set<Host> hosts, Configuration conf) + { + this.tokenRange = tr; + this.hosts = hosts; + this.conf = conf; + } + + public List<org.apache.hadoop.mapreduce.InputSplit> call() throws Exception + { + ArrayList<org.apache.hadoop.mapreduce.InputSplit> splits = new ArrayList<>(); + Map<TokenRange, Long> subSplits; + subSplits = getSubSplits(keyspace, cfName, tokenRange, conf); + // turn the sub-ranges into InputSplits + String[] endpoints = new String[hosts.size()]; + + // hadoop needs hostname, not ip + int endpointIndex = 0; + for (Host endpoint : hosts) + endpoints[endpointIndex++] = endpoint.getAddress().getHostName(); + + boolean partitionerIsOpp = partitioner instanceof OrderPreservingPartitioner || partitioner instanceof ByteOrderedPartitioner; + + for (TokenRange subSplit : subSplits.keySet()) + { + List<TokenRange> ranges = subSplit.unwrap(); + for (TokenRange subrange : ranges) + { + ColumnFamilySplit split = + new ColumnFamilySplit( + partitionerIsOpp ? + subrange.getStart().toString().substring(2) : subrange.getStart().toString(), + partitionerIsOpp ? + subrange.getEnd().toString().substring(2) : subrange.getStart().toString(), + subSplits.get(subSplit), + endpoints); + + logger.debug("adding {}", split); + splits.add(split); + } + } + return splits; + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/446e2537/src/java/org/apache/cassandra/hadoop/cql3/CqlOutputFormat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlOutputFormat.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlOutputFormat.java index 9a1cda6..cc0a6b1 100644 --- a/src/java/org/apache/cassandra/hadoop/cql3/CqlOutputFormat.java +++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlOutputFormat.java @@ -55,6 +55,9 @@ import org.apache.hadoop.mapreduce.*; public class CqlOutputFormat extends OutputFormat<Map<String, ByteBuffer>, List<ByteBuffer>> implements org.apache.hadoop.mapred.OutputFormat<Map<String, ByteBuffer>, List<ByteBuffer>> { + public static final String BATCH_THRESHOLD = "mapreduce.output.columnfamilyoutputformat.batch.threshold"; + public static final String QUEUE_SIZE = "mapreduce.output.columnfamilyoutputformat.queue.size"; + /** * Check for validity of the output-specification for the job. * http://git-wip-us.apache.org/repos/asf/cassandra/blob/446e2537/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java index 78b0494..c9198c6 100644 --- a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java +++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java @@ -31,7 +31,6 @@ import com.datastax.driver.core.exceptions.*; import org.apache.cassandra.db.marshal.CompositeType; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Token; -import org.apache.cassandra.hadoop.ColumnFamilyOutputFormat; import org.apache.cassandra.hadoop.ConfigHelper; import org.apache.cassandra.hadoop.HadoopCompat; import org.apache.cassandra.utils.FBUtilities; @@ -109,8 +108,8 @@ class CqlRecordWriter extends RecordWriter<Map<String, ByteBuffer>, List<ByteBuf CqlRecordWriter(Configuration conf) { this.conf = conf; - this.queueSize = conf.getInt(ColumnFamilyOutputFormat.QUEUE_SIZE, 32 * FBUtilities.getAvailableProcessors()); - batchThreshold = conf.getLong(ColumnFamilyOutputFormat.BATCH_THRESHOLD, 32); + this.queueSize = conf.getInt(CqlOutputFormat.QUEUE_SIZE, 32 * FBUtilities.getAvailableProcessors()); + batchThreshold = conf.getLong(CqlOutputFormat.BATCH_THRESHOLD, 32); this.clients = new HashMap<>(); try
