TAJO-1266: Too many logs when writing a parquet relation. (DaeMyung Kang via jihoon)
Closes #320 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/4c713fb4 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/4c713fb4 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/4c713fb4 Branch: refs/heads/index_support Commit: 4c713fb40464f651994926d3f56205e2e7cc2552 Parents: 533e709 Author: Jihoon Son <[email protected]> Authored: Thu Dec 25 19:23:53 2014 +0900 Committer: Jihoon Son <[email protected]> Committed: Thu Dec 25 19:23:53 2014 +0900 ---------------------------------------------------------------------- CHANGES | 3 +++ .../thirdparty/parquet/ColumnChunkPageWriteStore.java | 6 +++--- .../parquet/InternalParquetRecordReader.java | 14 ++++++++------ .../parquet/InternalParquetRecordWriter.java | 4 ++-- 4 files changed, 16 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/4c713fb4/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 124977d..9cd2a48 100644 --- a/CHANGES +++ b/CHANGES @@ -24,6 +24,9 @@ Release 0.9.1 - unreleased IMPROVEMENT + TAJO-1266: Too many logs when writing a parquet relation. + (DaeMyung Kang via jihoon) + TAJO-1268: tajo-client module should not use UserGroupInformation. (hyunsik) http://git-wip-us.apache.org/repos/asf/tajo/blob/4c713fb4/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ColumnChunkPageWriteStore.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ColumnChunkPageWriteStore.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ColumnChunkPageWriteStore.java index 0dedd9b..91d4748 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ColumnChunkPageWriteStore.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ColumnChunkPageWriteStore.java @@ -36,7 +36,7 @@ import java.io.IOException; import java.util.*; import static org.apache.tajo.storage.thirdparty.parquet.CodecFactory.BytesCompressor; -import static parquet.Log.INFO; +import static parquet.Log.DEBUG; class ColumnChunkPageWriteStore implements PageWriteStore { private static final Log LOG = Log.getLog(ColumnChunkPageWriteStore.class); @@ -140,8 +140,8 @@ class ColumnChunkPageWriteStore implements PageWriteStore { } writer.writeDataPages(BytesInput.from(buf), uncompressedLength, compressedLength, totalStatistics, new ArrayList<Encoding>(encodings)); writer.endColumn(); - if (INFO) { - LOG.info( + if (DEBUG) { + LOG.debug( String.format( "written %,dB for %s: %,d values, %,dB raw, %,dB comp, %d pages, encodings: %s", buf.size(), path, totalValueCount, uncompressedLength, compressedLength, pageCount, encodings) http://git-wip-us.apache.org/repos/asf/tajo/blob/4c713fb4/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordReader.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordReader.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordReader.java index 6bbd7b5..10ac6de 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordReader.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordReader.java @@ -93,14 +93,14 @@ class InternalParquetRecordReader<T> { if (current != 0) { long timeAssembling = System.currentTimeMillis() - startedAssemblingCurrentBlockAt; totalTimeSpentProcessingRecords += timeAssembling; - LOG.info("Assembled and processed " + totalCountLoadedSoFar + " records from " + columnCount + " columns in " + totalTimeSpentProcessingRecords + " ms: "+((float)totalCountLoadedSoFar / totalTimeSpentProcessingRecords) + " rec/ms, " + ((float)totalCountLoadedSoFar * columnCount / totalTimeSpentProcessingRecords) + " cell/ms"); + if (DEBUG) LOG.debug("Assembled and processed " + totalCountLoadedSoFar + " records from " + columnCount + " columns in " + totalTimeSpentProcessingRecords + " ms: " + ((float) totalCountLoadedSoFar / totalTimeSpentProcessingRecords) + " rec/ms, " + ((float) totalCountLoadedSoFar * columnCount / totalTimeSpentProcessingRecords) + " cell/ms"); long totalTime = totalTimeSpentProcessingRecords + totalTimeSpentReadingBytes; long percentReading = 100 * totalTimeSpentReadingBytes / totalTime; long percentProcessing = 100 * totalTimeSpentProcessingRecords / totalTime; - LOG.info("time spent so far " + percentReading + "% reading ("+totalTimeSpentReadingBytes+" ms) and " + percentProcessing + "% processing ("+totalTimeSpentProcessingRecords+" ms)"); + if (DEBUG) LOG.debug("time spent so far " + percentReading + "% reading ("+totalTimeSpentReadingBytes+" ms) and " + percentProcessing + "% processing ("+totalTimeSpentProcessingRecords+" ms)"); } - LOG.info("at row " + current + ". reading next block"); + if (DEBUG) LOG.debug("at row " + current + ". reading next block"); long t0 = System.currentTimeMillis(); PageReadStore pages = reader.readNextRowGroup(); if (pages == null) { @@ -109,8 +109,10 @@ class InternalParquetRecordReader<T> { long timeSpentReading = System.currentTimeMillis() - t0; totalTimeSpentReadingBytes += timeSpentReading; BenchmarkCounter.incrementTime(timeSpentReading); - LOG.info("block read in memory in " + timeSpentReading + " ms. row count = " + pages.getRowCount()); - if (Log.DEBUG) LOG.debug("initializing Record assembly with requested schema " + requestedSchema); + if (DEBUG) { + LOG.debug("block read in memory in " + timeSpentReading + " ms. row count = " + pages.getRowCount()); + LOG.debug("initializing Record assembly with requested schema " + requestedSchema); + } MessageColumnIO columnIO = columnIOFactory.getColumnIO(requestedSchema, fileSchema); recordReader = columnIO.getRecordReader(pages, recordConverter, recordFilter); startedAssemblingCurrentBlockAt = System.currentTimeMillis(); @@ -153,7 +155,7 @@ class InternalParquetRecordReader<T> { for (BlockMetaData block : blocks) { total += block.getRowCount(); } - LOG.info("RecordReader initialized will read a total of " + total + " records."); + if (DEBUG) LOG.debug("RecordReader initialized will read a total of " + total + " records."); } private boolean contains(GroupType group, String[] path, int index) { http://git-wip-us.apache.org/repos/asf/tajo/blob/4c713fb4/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordWriter.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordWriter.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordWriter.java index 532d9a2..da57745 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordWriter.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordWriter.java @@ -124,7 +124,7 @@ class InternalParquetRecordWriter<T> { if (recordCount >= recordCountForNextMemCheck) { // checking the memory size is relatively expensive, so let's not do it for every record. long memSize = store.memSize(); if (memSize > blockSize) { - LOG.info(format("mem size %,d > %,d: flushing %,d records to disk.", memSize, blockSize, recordCount)); + if (DEBUG) LOG.debug(format("mem size %,d > %,d: flushing %,d records to disk.", memSize, blockSize, recordCount)); flushStore(); initStore(); recordCountForNextMemCheck = min(max(MINIMUM_RECORD_COUNT_FOR_CHECK, recordCount / 2), MAXIMUM_RECORD_COUNT_FOR_CHECK); @@ -145,7 +145,7 @@ class InternalParquetRecordWriter<T> { private void flushStore() throws IOException { - LOG.info(format("Flushing mem store to file. allocated memory: %,d", store.allocatedSize())); + if (DEBUG) LOG.debug(format("Flushing mem store to file. allocated memory: %,d", store.allocatedSize())); if (store.allocatedSize() > 3 * blockSize) { LOG.warn("Too much memory used: " + store.memUsageString()); }
