Repository: tajo Updated Branches: refs/heads/master 64c31a226 -> f62d34bea
TAJO-2038: NPE in DelimitedTextFileScanner#getProgress. Closes #930 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/f62d34be Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/f62d34be Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/f62d34be Branch: refs/heads/master Commit: f62d34beab7fba2da6a54ad88eb37fed093c37b5 Parents: 64c31a2 Author: Jinho Kim <[email protected]> Authored: Tue Jan 12 11:20:14 2016 +0900 Committer: Jinho Kim <[email protected]> Committed: Tue Jan 12 11:20:14 2016 +0900 ---------------------------------------------------------------------- CHANGES | 2 + .../apache/tajo/storage/avro/AvroScanner.java | 11 +++ .../org/apache/tajo/storage/orc/ORCScanner.java | 5 +- .../tajo/storage/parquet/ParquetScanner.java | 10 +++ .../sequencefile/SequenceFileScanner.java | 25 +++++++ .../tajo/storage/text/DelimitedTextFile.java | 76 +++++++++++--------- .../parquet/InternalParquetRecordReader.java | 2 +- .../thirdparty/parquet/ParquetReader.java | 8 +++ .../org/apache/tajo/storage/TestStorages.java | 53 ++++++++++++++ 9 files changed, 154 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/f62d34be/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index ffe3c6d..b1a85a8 100644 --- a/CHANGES +++ b/CHANGES @@ -78,6 +78,8 @@ Release 0.12.0 - unreleased BUG FIXES + TAJO-2038: NPE in FileScanner#getProgress. (jinho) + TAJO-2034: Files required for executing python functions are not copied in testEval(). (jihoon) http://git-wip-us.apache.org/repos/asf/tajo/blob/f62d34be/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroScanner.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroScanner.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroScanner.java index afa2701..ad48850 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroScanner.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroScanner.java @@ -292,4 +292,15 @@ public class AvroScanner extends FileScanner { public boolean isSplittable() { return false; } + + @Override + public float getProgress() { + if (!inited) return super.getProgress(); + + if (!dataFileReader.hasNext()) { + return 1.0f; + } else { + return 0.0f; + } + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/f62d34be/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/ORCScanner.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/ORCScanner.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/ORCScanner.java index 32a2aaa..9351c59 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/ORCScanner.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/ORCScanner.java @@ -113,8 +113,6 @@ public class ORCScanner extends FileScanner { targets = schema.toArray(); } - super.init(); - outTuple = new VTuple(targets.length); Path path = fragment.getPath(); @@ -163,6 +161,7 @@ public class ORCScanner extends FileScanner { recordReader = orcReader.createRecordReader(columnSet, OrcPredicate.TRUE, fragment.getStartKey(), fragment.getLength(), DateTimeZone.forTimeZone(timezone)); + super.init(); LOG.debug("file fragment { path: " + fragment.getPath() + ", start offset: " + fragment.getStartKey() + ", length: " + fragment.getLength() + "}"); @@ -307,6 +306,8 @@ public class ORCScanner extends FileScanner { @Override public float getProgress() { + if(!inited) return super.getProgress(); + return recordReader.getProgress(); } http://git-wip-us.apache.org/repos/asf/tajo/blob/f62d34be/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetScanner.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetScanner.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetScanner.java index ef74a90..d7f753c 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetScanner.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetScanner.java @@ -124,4 +124,14 @@ public class ParquetScanner extends FileScanner { public boolean isSplittable() { return false; } + + @Override + public float getProgress() { + + if (!inited) { + return super.getProgress(); + } else { + return reader.getProgress(); + } + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/f62d34be/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java index 37cffdb..9ad5ab3 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java @@ -361,4 +361,29 @@ public class SequenceFileScanner extends FileScanner { public boolean isSplittable(){ return true; } + + @Override + public float getProgress() { + if (!inited) return super.getProgress(); + + if (!more) { + return 1.0f; + } else { + long filePos; + float progress; + try { + filePos = reader.getPosition(); + if (start == filePos) { + progress = 0.0f; + } else { + long readBytes = filePos - start; + long remainingBytes = Math.max(end - filePos, 0); + progress = Math.min(1.0f, (float) (readBytes) / (float) (readBytes + remainingBytes)); + } + } catch (IOException e) { + progress = 0.0f; + } + return progress; + } + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/f62d34be/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java index 46d7f6a..12ab738 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java @@ -300,50 +300,19 @@ public class DelimitedTextFile { @Override public void init() throws IOException { - if (reader != null) { - reader.close(); - } - - if(deserializer != null) { - deserializer.release(); - } - reader = new DelimitedLineReader(conf, fragment, conf.getInt(READ_BUFFER_SIZE, 128 * StorageUnit.KB)); reader.init(); - recordCount = 0; if (targets == null) { targets = schema.toArray(); } - outTuple = new VTuple(targets.length); + reset(); super.init(); if (LOG.isDebugEnabled()) { LOG.debug("DelimitedTextFileScanner open:" + fragment.getPath() + "," + startOffset + "," + endOffset); } - - // skip first line if it reads from middle of file - if (startOffset > 0) { - reader.readLine(); - } else { // skip header lines if it is defined - - // initialization for skipping header(max 20) - int headerLineNum = Math.min(Integer.parseInt(meta.getProperty(StorageConstants.TEXT_SKIP_HEADER_LINE, "0")), 20); - if (headerLineNum > 0) { - LOG.info(String.format("Skip %d header lines", headerLineNum)); - for (int i = 0; i < headerLineNum; i++) { - if (!reader.isReadable()) { - return; - } - - reader.readLine(); - } - } - } - - deserializer = getLineSerde().createDeserializer(schema, meta, targets); - deserializer.init(); } public TextLineSerDe getLineSerde() { @@ -436,7 +405,44 @@ public class DelimitedTextFile { @Override public void reset() throws IOException { - init(); + recordCount = 0; + + if (reader.getReadBytes() > 0) { + reader.close(); + + reader = new DelimitedLineReader(conf, fragment, conf.getInt(READ_BUFFER_SIZE, 128 * StorageUnit.KB)); + reader.init(); + } + + if(deserializer != null) { + deserializer.release(); + } + + deserializer = getLineSerde().createDeserializer(schema, meta, targets); + deserializer.init(); + + outTuple = new VTuple(targets.length); + + // skip first line if it reads from middle of file + if (startOffset > 0) { + reader.readLine(); + } else { // skip header lines if it is defined + + // initialization for skipping header(max 20) + int headerLineNum = Math.min(Integer.parseInt( + meta.getProperty(StorageConstants.TEXT_SKIP_HEADER_LINE, "0")), 20); + + if (headerLineNum > 0) { + LOG.info(String.format("Skip %d header lines", headerLineNum)); + for (int i = 0; i < headerLineNum; i++) { + if (!reader.isReadable()) { + return; + } + + reader.readLine(); + } + } + } } @Override @@ -446,16 +452,16 @@ public class DelimitedTextFile { deserializer.release(); } - if (tableStats != null && reader != null) { + if (reader != null) { tableStats.setReadBytes(reader.getReadBytes()); //Actual Processed Bytes. (decompressed bytes + overhead) tableStats.setNumRows(recordCount); } + if (LOG.isDebugEnabled()) { LOG.debug("DelimitedTextFileScanner processed record:" + recordCount); } } finally { IOUtils.cleanup(LOG, reader); - reader = null; outTuple = null; } } http://git-wip-us.apache.org/repos/asf/tajo/blob/f62d34be/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordReader.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordReader.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordReader.java index 10ac6de..5beba14 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordReader.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordReader.java @@ -134,7 +134,7 @@ class InternalParquetRecordReader<T> { return currentValue; } - public float getProgress() throws IOException, InterruptedException { + public float getProgress() { return (float) current / total; } http://git-wip-us.apache.org/repos/asf/tajo/blob/f62d34be/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetReader.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetReader.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetReader.java index 739686f..c353a81 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetReader.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetReader.java @@ -143,4 +143,12 @@ public class ParquetReader<T> implements Closeable { reader.close(); } } + + public float getProgress() { + if (!footersIterator.hasNext()) { + return 1.0f; + } else { + return reader != null ? reader.getProgress() : 0.0f; + } + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/f62d34be/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java index c4514b9..a08dfb9 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java @@ -48,6 +48,7 @@ import org.apache.tajo.storage.sequencefile.SequenceFileScanner; import org.apache.tajo.util.CommonTestingUtil; import org.apache.tajo.util.JavaResourceUtil; import org.apache.tajo.util.KeyValueSet; +import org.junit.After; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -154,6 +155,11 @@ public class TestStorages { }); } + @After + public void tearDown() throws IOException { + fs.delete(testDir, true); + } + @Test public void testSplitable() throws IOException { if (splitable) { @@ -1303,4 +1309,51 @@ public class TestStorages { IOUtils.cleanup(null, appender); } } + + @Test + public void testProgress() throws IOException { + + Schema schema = new Schema(); + schema.addColumn("col1", Type.FLOAT4); + schema.addColumn("col2", Type.FLOAT8); + schema.addColumn("col3", Type.INT2); + schema.addColumn("col4", Type.INT4); + schema.addColumn("col5", Type.INT8); + + KeyValueSet options = new KeyValueSet(); + TableMeta meta = CatalogUtil.newTableMeta(dataFormat, options); + if (dataFormat.equalsIgnoreCase(BuiltinStorages.AVRO)) { + meta.putProperty(StorageConstants.AVRO_SCHEMA_LITERAL, TEST_MAX_VALUE_AVRO_SCHEMA); + } + + FileTablespace sm = TablespaceManager.getLocalFs(); + Path tablePath = new Path(testDir, "testProgress.data"); + Appender appender = sm.getAppender(meta, schema, tablePath); + appender.init(); + + VTuple tuple = new VTuple(new Datum[]{ + DatumFactory.createFloat4(Float.MAX_VALUE), + DatumFactory.createFloat8(Double.MAX_VALUE), + DatumFactory.createInt2(Short.MAX_VALUE), + DatumFactory.createInt4(Integer.MAX_VALUE), + DatumFactory.createInt8(Long.MAX_VALUE) + }); + + appender.addTuple(tuple); + appender.flush(); + appender.close(); + + FileStatus status = fs.getFileStatus(tablePath); + FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen()); + Scanner scanner = sm.getScanner(meta, schema, fragment, null); + + assertEquals(0.0f, scanner.getProgress(), 0.0f); + + scanner.init(); + assertNotNull(scanner.next()); + assertNull(null, scanner.next()); + + scanner.close(); + assertEquals(1.0f, scanner.getProgress(), 0.0f); + } } \ No newline at end of file
