Author: billie
Date: Wed Aug 15 15:13:12 2012
New Revision: 1373452
URL: http://svn.apache.org/viewvc?rev=1373452&view=rev
Log:
ACCUMULO-467 made file properties configurable for AccumuloFileOutputFormat
Modified:
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java
Modified:
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java
URL:
http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java?rev=1373452&r1=1373451&r2=1373452&view=diff
==============================================================================
---
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java
(original)
+++
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java
Wed Aug 15 15:13:12 2012
@@ -46,7 +46,6 @@ import org.apache.hadoop.mapreduce.lib.o
public class AccumuloFileOutputFormat extends FileOutputFormat<Key,Value> {
private static final String PREFIX =
AccumuloOutputFormat.class.getSimpleName();
public static final String FILE_TYPE = PREFIX + ".file_type";
- public static final String BLOCK_SIZE = PREFIX + ".block_size";
private static final String INSTANCE_HAS_BEEN_SET = PREFIX +
".instanceConfigured";
private static final String INSTANCE_NAME = PREFIX + ".instanceName";
@@ -61,7 +60,6 @@ public class AccumuloFileOutputFormat ex
if (extension == null || extension.isEmpty())
extension = RFile.EXTENSION;
- handleBlockSize(job.getConfiguration());
final Path file = this.getDefaultWorkFile(job, "." + extension);
return new RecordWriter<Key,Value>() {
@@ -84,28 +82,16 @@ public class AccumuloFileOutputFormat ex
};
}
- protected static void handleBlockSize(Configuration conf) {
- int blockSize;
- if (conf.getBoolean(INSTANCE_HAS_BEEN_SET, false)) {
- blockSize = (int) new ZooKeeperInstance(conf.get(INSTANCE_NAME),
conf.get(ZOOKEEPERS)).getConfiguration().getMemoryInBytes(
- Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE);
- } else {
- blockSize = getBlockSize(conf);
- }
- conf.setInt("io.seqfile.compress.blocksize", blockSize);
-
- }
-
public static void setFileType(Configuration conf, String type) {
conf.set(FILE_TYPE, type);
}
+ /**
+ * @deprecated since 1.5, use {@link #setCompressedBlockSize(Configuration,
long)} instead
+ */
public static void setBlockSize(Configuration conf, int blockSize) {
- conf.setInt(BLOCK_SIZE, blockSize);
- }
-
- private static int getBlockSize(Configuration conf) {
- return conf.getInt(BLOCK_SIZE, (int)
AccumuloConfiguration.getDefaultConfiguration().getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE));
+ long bs = blockSize;
+ setCompressedBlockSize(conf, bs);
}
/**
@@ -130,4 +116,24 @@ public class AccumuloFileOutputFormat ex
protected static Instance getInstance(Configuration conf) {
return new ZooKeeperInstance(conf.get(INSTANCE_NAME),
conf.get(ZOOKEEPERS));
}
+
+ public static void setReplication(Configuration conf, int replication) {
+ conf.setInt(Property.TABLE_FILE_REPLICATION.getKey(), replication);
+ }
+
+ public static void setDFSBlockSize(Configuration conf, long blockSize) {
+ conf.setLong(Property.TABLE_FILE_BLOCK_SIZE.getKey(), blockSize);
+ }
+
+ public static void setCompressedBlockSize(Configuration conf, long
cblockSize) {
+ conf.setLong(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey(),
cblockSize);
+ }
+
+ public static void setCompressedBlockSizeIndex(Configuration conf, long
cblockSizeIndex) {
+ conf.setLong(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX.getKey(),
cblockSizeIndex);
+ }
+
+ public static void setCompressionType(Configuration conf, String
compression) {
+ conf.set(Property.TABLE_FILE_COMPRESSION_TYPE.getKey(), compression);
+ }
}
Modified:
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
URL:
http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java?rev=1373452&r1=1373451&r2=1373452&view=diff
==============================================================================
---
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
(original)
+++
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
Wed Aug 15 15:13:12 2012
@@ -104,22 +104,23 @@ public class RFileOperations extends Fil
@Override
public FileSKVWriter openWriter(String file, FileSystem fs, Configuration
conf, AccumuloConfiguration acuconf) throws IOException {
int hrep = conf.getInt("dfs.replication", -1);
- int trep = acuconf.getCount(Property.TABLE_FILE_REPLICATION);
+ int trep = conf.getInt(Property.TABLE_FILE_REPLICATION.getKey(),
acuconf.getCount(Property.TABLE_FILE_REPLICATION));
int rep = hrep;
if (trep > 0 && trep != hrep) {
rep = trep;
}
long hblock = conf.getLong("dfs.block.size", 1 << 26);
- long tblock = acuconf.getMemoryInBytes(Property.TABLE_FILE_BLOCK_SIZE);
+ long tblock = conf.getLong(Property.TABLE_FILE_BLOCK_SIZE.getKey(),
acuconf.getMemoryInBytes(Property.TABLE_FILE_BLOCK_SIZE));
long block = hblock;
if (tblock > 0)
block = tblock;
int bufferSize = conf.getInt("io.file.buffer.size", 4096);
- long blockSize =
acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE);
- long indexBlockSize =
acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX);
+ long blockSize =
conf.getLong(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey(),
acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE));
+ long indexBlockSize =
conf.getLong(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX.getKey(),
+
acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX));
- String compression = acuconf.get(Property.TABLE_FILE_COMPRESSION_TYPE);
+ String compression =
conf.get(Property.TABLE_FILE_COMPRESSION_TYPE.getKey(),
acuconf.get(Property.TABLE_FILE_COMPRESSION_TYPE));
CachableBlockFile.Writer _cbw = new CachableBlockFile.Writer(fs.create(new
Path(file), false, bufferSize, (short) rep, block), compression, conf);
Writer writer = new RFile.Writer(_cbw, (int) blockSize, (int)
indexBlockSize);
Modified:
accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java
URL:
http://svn.apache.org/viewvc/accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java?rev=1373452&r1=1373451&r2=1373452&view=diff
==============================================================================
---
accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java
(original)
+++
accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java
Wed Aug 15 15:13:12 2012
@@ -27,6 +27,7 @@ import org.apache.accumulo.core.conf.Pro
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.util.ContextFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordWriter;
@@ -59,17 +60,6 @@ public class AccumuloFileOutputFormatTes
}
@Test
- public void testSet() throws IOException, InterruptedException {
- AccumuloFileOutputFormat.setBlockSize(job.getConfiguration(), 300);
- validate(300);
- }
-
- @Test
- public void testUnset() throws IOException, InterruptedException {
- validate((int)
AccumuloConfiguration.getDefaultConfiguration().getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE));
- }
-
- @Test
public void testEmptyWrite() throws IOException, InterruptedException {
handleWriteTests(false);
}
@@ -97,10 +87,28 @@ public class AccumuloFileOutputFormatTes
file.getFileSystem(tac.getConfiguration()).delete(file.getParent(), true);
}
- public void validate(int size) throws IOException, InterruptedException {
- AccumuloFileOutputFormat.handleBlockSize(job.getConfiguration());
- int detSize =
job.getConfiguration().getInt("io.seqfile.compress.blocksize", -1);
- assertEquals(size, detSize);
+ @Test
+ public void validateConfiguration() throws IOException, InterruptedException
{
+ Configuration conf = job.getConfiguration();
+ AccumuloConfiguration acuconf =
AccumuloConfiguration.getDefaultConfiguration();
+
+ int a = 7;
+ long b = 300l;
+ long c = 50l;
+ long d = 10l;
+ String e = "type";
+
+ AccumuloFileOutputFormat.setReplication(conf, a);
+ AccumuloFileOutputFormat.setDFSBlockSize(conf, b);
+ AccumuloFileOutputFormat.setCompressedBlockSize(conf, c);
+ AccumuloFileOutputFormat.setCompressedBlockSizeIndex(conf, d);
+ AccumuloFileOutputFormat.setCompressionType(conf, e);
+
+ assertEquals(a, conf.getInt(Property.TABLE_FILE_REPLICATION.getKey(),
acuconf.getCount(Property.TABLE_FILE_REPLICATION)));
+ assertEquals(b, conf.getLong(Property.TABLE_FILE_BLOCK_SIZE.getKey(),
acuconf.getMemoryInBytes(Property.TABLE_FILE_BLOCK_SIZE)));
+ assertEquals(c,
conf.getLong(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey(),
acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE)));
+ assertEquals(d,
+ conf.getLong(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX.getKey(),
acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX)));
+ assertEquals(e, conf.get(Property.TABLE_FILE_COMPRESSION_TYPE.getKey(),
acuconf.get(Property.TABLE_FILE_COMPRESSION_TYPE)));
}
-
}