Repository: tajo Updated Branches: refs/heads/master fcc5da03a -> 0603b49dd
TAJO-999: SequenceFile key class need to be compatible. (jaehwa) Closes #110 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/0603b49d Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/0603b49d Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/0603b49d Branch: refs/heads/master Commit: 0603b49dddb5d2b3c6f6447fa672e0ea03eae09e Parents: fcc5da0 Author: Jaehwa Jung <[email protected]> Authored: Mon Aug 11 11:36:18 2014 +0900 Committer: Jaehwa Jung <[email protected]> Committed: Mon Aug 11 11:36:18 2014 +0900 ---------------------------------------------------------------------- CHANGES | 2 ++ .../sequencefile/SequenceFileAppender.java | 20 +++++++++++--------- .../sequencefile/SequenceFileScanner.java | 20 ++++++++++++++------ .../tajo/storage/TestCompressionStorages.java | 12 ++++++++++++ .../org/apache/tajo/storage/TestStorages.java | 13 +++++++++++++ 5 files changed, 52 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/0603b49d/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 864eaef..deb41a0 100644 --- a/CHANGES +++ b/CHANGES @@ -106,6 +106,8 @@ Release 0.9.0 - unreleased BUG FIXES + TAJO-999: SequenceFile key class need to be compatible. (jaehwa) + TAJO-994: 'count(distinct x)' function counts first null value. (hyunsik) TAJO-996: Sometimes, scheduleFetchesByEvenDistributedVolumes loses http://git-wip-us.apache.org/repos/asf/tajo/blob/0603b49d/tajo-storage/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java b/tajo-storage/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java index b150a9a..86d902a 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java @@ -25,11 +25,8 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.*; import org.apache.hadoop.io.SequenceFile.CompressionType; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.tajo.catalog.Schema; @@ -73,7 +70,8 @@ public class SequenceFileAppender extends FileAppender { long rowCount; private boolean isShuffle; - private static final BytesWritable EMPTY_KEY = new BytesWritable(); + + private Writable EMPTY_KEY; public SequenceFileAppender(Configuration conf, Schema schema, TableMeta meta, Path path) throws IOException { super(conf, schema, meta, path); @@ -128,20 +126,24 @@ public class SequenceFileAppender extends FileAppender { throw new IOException(e); } - Class<? extends Writable> valueClass; + Class<? extends Writable> keyClass, valueClass; if (serde instanceof BinarySerializerDeserializer) { + keyClass = BytesWritable.class; + EMPTY_KEY = new BytesWritable(); valueClass = BytesWritable.class; } else { + keyClass = LongWritable.class; + EMPTY_KEY = new LongWritable(); valueClass = Text.class; } String type = this.meta.getOption(StorageConstants.COMPRESSION_TYPE, CompressionType.NONE.name()); if (type.equals(CompressionType.BLOCK.name())) { - writer = SequenceFile.createWriter(fs, conf, path, BytesWritable.class, valueClass, CompressionType.BLOCK, codec); + writer = SequenceFile.createWriter(fs, conf, path, keyClass, valueClass, CompressionType.BLOCK, codec); } else if (type.equals(CompressionType.RECORD.name())) { - writer = SequenceFile.createWriter(fs, conf, path, BytesWritable.class, valueClass, CompressionType.RECORD, codec); + writer = SequenceFile.createWriter(fs, conf, path, keyClass, valueClass, CompressionType.RECORD, codec); } else { - writer = SequenceFile.createWriter(fs, conf, path, BytesWritable.class, valueClass, CompressionType.NONE, codec); + writer = SequenceFile.createWriter(fs, conf, path, keyClass, valueClass, CompressionType.NONE, codec); } if (enabledStats) { http://git-wip-us.apache.org/repos/asf/tajo/blob/0603b49d/tajo-storage/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java b/tajo-storage/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java index 32d1d57..3c39841 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java @@ -24,10 +24,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.io.*; +import org.apache.hadoop.util.ReflectionUtils; import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableMeta; @@ -37,6 +35,7 @@ import org.apache.tajo.datum.NullDatum; import org.apache.tajo.storage.*; import org.apache.tajo.storage.fragment.FileFragment; import org.apache.tajo.util.BytesUtils; +import org.apache.tajo.util.ReflectionUtil; import java.io.IOException; @@ -72,7 +71,7 @@ public class SequenceFileScanner extends FileScanner { private int elementOffset, elementSize; - private static final BytesWritable EMPTY_KEY = new BytesWritable(); + private Writable EMPTY_KEY; public SequenceFileScanner(Configuration conf, Schema schema, TableMeta meta, FileFragment fragment) throws IOException { super(conf, schema, meta, fragment); @@ -120,8 +119,13 @@ public class SequenceFileScanner extends FileScanner { String serdeClass = this.meta.getOption(StorageConstants.SEQUENCEFILE_SERDE, TextSerializerDeserializer.class.getName()); serde = (SerializerDeserializer) Class.forName(serdeClass).newInstance(); - if (serde instanceof BinarySerializerDeserializer) + if (serde instanceof BinarySerializerDeserializer) { hasBinarySerDe = true; + } + + Class<? extends Writable> keyClass = (Class<? extends Writable>)Class.forName(reader.getKeyClassName()); + EMPTY_KEY = keyClass.newInstance(); + } catch (Exception e) { LOG.error(e.getMessage(), e); throw new IOException(e); @@ -129,6 +133,10 @@ public class SequenceFileScanner extends FileScanner { super.init(); } + public Writable getKey() { + return EMPTY_KEY; + } + private void prepareProjection(Column [] targets) { projectionMap = new int[targets.length]; http://git-wip-us.apache.org/repos/asf/tajo/blob/0603b49d/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java index 9fe5721..61f4682 100644 --- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java +++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java @@ -21,7 +21,10 @@ package org.apache.tajo.storage; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.compress.*; import org.apache.hadoop.io.compress.zlib.ZlibFactory; import org.apache.hadoop.util.NativeCodeLoader; @@ -34,6 +37,7 @@ import org.apache.tajo.common.TajoDataTypes.Type; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.datum.DatumFactory; import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.storage.sequencefile.SequenceFileScanner; import org.apache.tajo.util.CommonTestingUtil; import org.junit.Test; import org.junit.runner.RunWith; @@ -44,6 +48,7 @@ import java.util.Arrays; import java.util.Collection; import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; @RunWith(Parameterized.class) public class TestCompressionStorages { @@ -228,6 +233,13 @@ public class TestCompressionStorages { } } scanner.init(); + + if (storeType == StoreType.SEQUENCEFILE) { + assertTrue(scanner instanceof SequenceFileScanner); + Writable key = ((SequenceFileScanner) scanner).getKey(); + assertEquals(key.getClass().getCanonicalName(), LongWritable.class.getCanonicalName()); + } + int tupleCnt = 0; Tuple tuple; while ((tuple = scanner.next()) != null) { http://git-wip-us.apache.org/repos/asf/tajo/blob/0603b49d/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java index 1cf1ecf..3bea740 100644 --- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java +++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java @@ -21,6 +21,9 @@ package org.apache.tajo.storage; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Writable; import org.apache.tajo.QueryId; import org.apache.tajo.TajoIdProtos; import org.apache.tajo.catalog.CatalogUtil; @@ -36,6 +39,7 @@ import org.apache.tajo.datum.NullDatum; import org.apache.tajo.datum.ProtobufDatumFactory; import org.apache.tajo.storage.fragment.FileFragment; import org.apache.tajo.storage.rcfile.RCFile; +import org.apache.tajo.storage.sequencefile.SequenceFileScanner; import org.apache.tajo.util.CommonTestingUtil; import org.apache.tajo.util.FileUtil; import org.apache.tajo.util.KeyValueSet; @@ -48,6 +52,7 @@ import java.util.Arrays; import java.util.Collection; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @RunWith(Parameterized.class) @@ -640,6 +645,10 @@ public class TestStorages { Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema, fragment); scanner.init(); + assertTrue(scanner instanceof SequenceFileScanner); + Writable key = ((SequenceFileScanner) scanner).getKey(); + assertEquals(key.getClass().getCanonicalName(), LongWritable.class.getCanonicalName()); + Tuple retrieved; while ((retrieved=scanner.next()) != null) { for (int i = 0; i < tuple.size(); i++) { @@ -709,6 +718,10 @@ public class TestStorages { Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema, fragment); scanner.init(); + assertTrue(scanner instanceof SequenceFileScanner); + Writable key = ((SequenceFileScanner) scanner).getKey(); + assertEquals(key.getClass().getCanonicalName(), BytesWritable.class.getCanonicalName()); + Tuple retrieved; while ((retrieved=scanner.next()) != null) { for (int i = 0; i < tuple.size(); i++) {
