Repository: tajo Updated Branches: refs/heads/master ed6603792 -> 011fcd922
TAJO-1975: Gathering fine-grained column statistics for range shuffle. Closes #859 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/011fcd92 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/011fcd92 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/011fcd92 Branch: refs/heads/master Commit: 011fcd922d0a809e8d6d88de441594fd13f649a0 Parents: ed66037 Author: Jihoon Son <[email protected]> Authored: Fri Nov 13 13:37:39 2015 +0900 Committer: Jihoon Son <[email protected]> Committed: Fri Nov 13 13:37:39 2015 +0900 ---------------------------------------------------------------------- CHANGES | 2 + .../physical/HashShuffleFileWriteExec.java | 12 +- .../physical/RangeShuffleFileWriteExec.java | 2 +- .../java/org/apache/tajo/storage/Appender.java | 56 ++++- .../apache/tajo/storage/TableStatistics.java | 74 +++---- .../storage/hbase/AbstractHBaseAppender.java | 23 +- .../tajo/storage/hbase/HBasePutAppender.java | 4 +- .../tajo/storage/hbase/HFileAppender.java | 6 +- .../org/apache/tajo/storage/FileAppender.java | 26 ++- .../tajo/storage/HashShuffleAppender.java | 209 ------------------- .../storage/HashShuffleAppenderManager.java | 14 +- .../storage/HashShuffleAppenderWrapper.java | 187 +++++++++++++++++ .../java/org/apache/tajo/storage/RawFile.java | 20 +- .../java/org/apache/tajo/storage/RowFile.java | 10 +- .../apache/tajo/storage/avro/AvroAppender.java | 27 ++- .../apache/tajo/storage/orc/ORCAppender.java | 20 +- .../tajo/storage/parquet/ParquetAppender.java | 35 ++-- .../storage/rawfile/DirectRawFileWriter.java | 10 +- .../org/apache/tajo/storage/rcfile/RCFile.java | 27 +-- .../sequencefile/SequenceFileAppender.java | 10 +- .../tajo/storage/text/DelimitedTextFile.java | 10 +- .../tajo/storage/TestCompressionStorages.java | 3 +- 22 files changed, 405 insertions(+), 382 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/011fcd92/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index c36ced1..98a780e 100644 --- a/CHANGES +++ b/CHANGES @@ -61,6 +61,8 @@ Release 0.12.0 - unreleased TASKS + TAJO-1975: Gathering fine-grained column statistics for range shuffle. (jihoon) + TAJO-1963: Add more configuration descriptions to document. (jihoon) TAJO-1970: Change the null first syntax. (jihoon) http://git-wip-us.apache.org/repos/asf/tajo/blob/011fcd92/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java index a72a375..35a204a 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java @@ -27,7 +27,7 @@ import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.conf.TajoConf.ConfVars; import org.apache.tajo.plan.logical.ShuffleFileWriteNode; -import org.apache.tajo.storage.HashShuffleAppender; +import org.apache.tajo.storage.HashShuffleAppenderWrapper; import org.apache.tajo.storage.HashShuffleAppenderManager; import org.apache.tajo.storage.Tuple; import org.apache.tajo.worker.TaskAttemptContext; @@ -45,7 +45,7 @@ public final class HashShuffleFileWriteExec extends UnaryPhysicalExec { private ShuffleFileWriteNode plan; private final TableMeta meta; private Partitioner partitioner; - private Map<Integer, HashShuffleAppender> appenderMap = new HashMap<>(); + private Map<Integer, HashShuffleAppenderWrapper> appenderMap = new HashMap<>(); private final int numShuffleOutputs; private final int [] shuffleKeyIds; private HashShuffleAppenderManager hashShuffleAppenderManager; @@ -79,8 +79,8 @@ public final class HashShuffleFileWriteExec extends UnaryPhysicalExec { super.init(); } - private HashShuffleAppender getAppender(int partId) throws IOException { - HashShuffleAppender appender = appenderMap.get(partId); + private HashShuffleAppenderWrapper getAppender(int partId) throws IOException { + HashShuffleAppenderWrapper appender = appenderMap.get(partId); if (appender == null) { appender = hashShuffleAppenderManager.getAppender(context.getConf(), context.getTaskId().getTaskId().getExecutionBlockId(), partId, meta, outSchema); @@ -113,7 +113,7 @@ public final class HashShuffleFileWriteExec extends UnaryPhysicalExec { if (tupleCount >= numHashShuffleBufferTuples) { for (Map.Entry<Integer, TupleList> entry : partitionTuples.entrySet()) { int appendPartId = entry.getKey(); - HashShuffleAppender appender = getAppender(appendPartId); + HashShuffleAppenderWrapper appender = getAppender(appendPartId); int appendedSize = appender.addTuples(context.getTaskId(), entry.getValue()); writtenBytes += appendedSize; entry.getValue().clear(); @@ -125,7 +125,7 @@ public final class HashShuffleFileWriteExec extends UnaryPhysicalExec { // processing remained tuples for (Map.Entry<Integer, TupleList> entry : partitionTuples.entrySet()) { int appendPartId = entry.getKey(); - HashShuffleAppender appender = getAppender(appendPartId); + HashShuffleAppenderWrapper appender = getAppender(appendPartId); int appendedSize = appender.addTuples(context.getTaskId(), entry.getValue()); writtenBytes += appendedSize; entry.getValue().clear(); http://git-wip-us.apache.org/repos/asf/tajo/blob/011fcd92/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java index 4d01b00..bcd2b17 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java @@ -83,7 +83,7 @@ public class RangeShuffleFileWriteExec extends UnaryPhysicalExec { fs.mkdirs(storeTablePath); this.appender = (FileAppender) ((FileTablespace) TablespaceManager.getDefault()) .getAppender(meta, outSchema, new Path(storeTablePath, "output")); - this.appender.enableStats(); + this.appender.enableStats(keySchema.getAllColumns()); this.appender.init(); this.indexWriter = bst.getIndexWriter(new Path(storeTablePath, "index"), BSTIndex.TWO_LEVEL_INDEX, keySchema, comp); http://git-wip-us.apache.org/repos/asf/tajo/blob/011fcd92/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Appender.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Appender.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Appender.java index c5e96ac..9e11799 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Appender.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Appender.java @@ -18,24 +18,74 @@ package org.apache.tajo.storage; +import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.statistics.TableStats; import java.io.Closeable; import java.io.IOException; +import java.util.List; +/** + * + * Interface for appender. + * Every appender that writes some data to underlying storage needs to implement this interface. + */ public interface Appender extends Closeable { + /** + * Initialize the appender. + * + * @throws IOException + */ void init() throws IOException; + /** + * Write the given tuple. + * + * @param t + * @throws IOException + */ void addTuple(Tuple t) throws IOException; - + + /** + * Flush buffered tuples if they exist. + * + * @throws IOException + */ void flush() throws IOException; + /** + * The total size of written output. + * The result value can be different from the real size due to the buffered data. + * + * @return + * @throws IOException + */ long getEstimatedOutputSize() throws IOException; - + + /** + * Close the appender. + * + * @throws IOException + */ void close() throws IOException; + /** + * Enable statistics collection for the output table. + */ void enableStats(); - + + /** + * Enable statistics collection for the output table as well as its columns. + * Note that statistics are collected on only the specified columns. + * @param columnList a list of columns on which statistics is collected + */ + void enableStats(List<Column> columnList); + + /** + * Return collected statistics. + * + * @return statistics + */ TableStats getStats(); } http://git-wip-us.apache.org/repos/asf/tajo/blob/011fcd92/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableStatistics.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableStatistics.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableStatistics.java index aa33ea3..31e07ee 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableStatistics.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableStatistics.java @@ -20,11 +20,11 @@ package org.apache.tajo.storage; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.tajo.TajoConstants; import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.statistics.ColumnStats; import org.apache.tajo.catalog.statistics.TableStats; -import org.apache.tajo.common.TajoDataTypes.DataType; import org.apache.tajo.common.TajoDataTypes.Type; import org.apache.tajo.datum.Datum; @@ -33,30 +33,26 @@ import org.apache.tajo.datum.Datum; */ public class TableStatistics { private static final Log LOG = LogFactory.getLog(TableStatistics.class); - private Schema schema; - private VTuple minValues; - private VTuple maxValues; - private long [] numNulls; + private final Schema schema; + private final VTuple minValues; + private final VTuple maxValues; + private final long [] numNulls; private long numRows = 0; - private long numBytes = 0; + private long numBytes = TajoConstants.UNKNOWN_LENGTH; - private boolean [] comparable; + private final boolean[] columnStatsEnabled; - public TableStatistics(Schema schema) { + public TableStatistics(Schema schema, boolean[] columnStatsEnabled) { this.schema = schema; minValues = new VTuple(schema.size()); maxValues = new VTuple(schema.size()); numNulls = new long[schema.size()]; - comparable = new boolean[schema.size()]; - DataType type; + this.columnStatsEnabled = columnStatsEnabled; for (int i = 0; i < schema.size(); i++) { - type = schema.getColumn(i).getDataType(); - if (type.getType() == Type.PROTOBUF) { - comparable[i] = false; - } else { - comparable[i] = true; + if (schema.getColumn(i).getDataType().getType().equals(Type.PROTOBUF)) { + columnStatsEnabled[i] = false; } } } @@ -85,18 +81,14 @@ public class TableStatistics { return this.numBytes; } - public void analyzeNull(int idx) { - numNulls[idx]++; - } - public void analyzeField(int idx, Tuple tuple) { - if (tuple.isBlankOrNull(idx)) { - numNulls[idx]++; - return; - } + if (columnStatsEnabled[idx]) { + if (tuple.isBlankOrNull(idx)) { + numNulls[idx]++; + return; + } - Datum datum = tuple.asDatum(idx); - if (comparable[idx]) { + Datum datum = tuple.asDatum(idx); if (!maxValues.contains(idx) || maxValues.get(idx).compareTo(datum) < 0) { maxValues.put(idx, datum); @@ -112,22 +104,24 @@ public class TableStatistics { TableStats stat = new TableStats(); for (int i = 0; i < schema.size(); i++) { - Column column = schema.getColumn(i); - ColumnStats columnStats = new ColumnStats(column); - columnStats.setNumNulls(numNulls[i]); - if (minValues.isBlank(i) || column.getDataType().getType() == minValues.type(i)) { - columnStats.setMinValue(minValues.get(i)); - } else { - LOG.warn("Wrong statistics column type (" + minValues.type(i) + - ", expected=" + column.getDataType().getType() + ")"); - } - if (minValues.isBlank(i) || column.getDataType().getType() == maxValues.type(i)) { - columnStats.setMaxValue(maxValues.get(i)); - } else { - LOG.warn("Wrong statistics column type (" + maxValues.type(i) + - ", expected=" + column.getDataType().getType() + ")"); + if (columnStatsEnabled[i]) { + Column column = schema.getColumn(i); + ColumnStats columnStats = new ColumnStats(column); + columnStats.setNumNulls(numNulls[i]); + if (minValues.isBlank(i) || column.getDataType().getType() == minValues.type(i)) { + columnStats.setMinValue(minValues.get(i)); + } else { + LOG.warn("Wrong statistics column type (" + minValues.type(i) + + ", expected=" + column.getDataType().getType() + ")"); + } + if (minValues.isBlank(i) || column.getDataType().getType() == maxValues.type(i)) { + columnStats.setMaxValue(maxValues.get(i)); + } else { + LOG.warn("Wrong statistics column type (" + maxValues.type(i) + + ", expected=" + column.getDataType().getType() + ")"); + } + stat.addColumnStat(columnStats); } - stat.addColumnStat(columnStats); } stat.setNumRows(this.numRows); http://git-wip-us.apache.org/repos/asf/tajo/blob/011fcd92/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java index b5f39a0..4289026 100644 --- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java +++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java @@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.KeyValue; import org.apache.tajo.TaskAttemptId; +import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.catalog.statistics.TableStats; @@ -53,7 +54,8 @@ public abstract class AbstractHBaseAppender implements Appender { protected ColumnMapping columnMapping; protected TableStatistics stats; - protected boolean enabledStats; + protected boolean tableStatsEnabled; + protected boolean[] columnStatsEnabled; protected int columnNum; @@ -88,8 +90,8 @@ public abstract class AbstractHBaseAppender implements Appender { throw new IllegalStateException("FileAppender is already initialized."); } inited = true; - if (enabledStats) { - stats = new TableStatistics(this.schema); + if (tableStatsEnabled) { + stats = new TableStatistics(this.schema, columnStatsEnabled); } try { columnMapping = new ColumnMapping(schema, meta.getOptions()); @@ -210,12 +212,23 @@ public abstract class AbstractHBaseAppender implements Appender { @Override public void enableStats() { - enabledStats = true; + if (inited) { + throw new IllegalStateException("Should enable this option before init()"); + } + + this.tableStatsEnabled = true; + this.columnStatsEnabled = new boolean[schema.size()]; + } + + @Override + public void enableStats(List<Column> columnList) { + enableStats(); + columnList.forEach(column -> columnStatsEnabled[schema.getIndex(column)] = true); } @Override public TableStats getStats() { - if (enabledStats) { + if (tableStatsEnabled) { return stats.getTableStat(); } else { return null; http://git-wip-us.apache.org/repos/asf/tajo/blob/011fcd92/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java index 20b1a08..337c062 100644 --- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java +++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java @@ -89,7 +89,7 @@ public class HBasePutAppender extends AbstractHBaseAppender { htable.put(put); - if (enabledStats) { + if (tableStatsEnabled) { stats.incrementRow(); stats.setNumBytes(totalNumBytes); } @@ -111,7 +111,7 @@ public class HBasePutAppender extends AbstractHBaseAppender { htable.flushCommits(); htable.close(); } - if (enabledStats) { + if (tableStatsEnabled) { stats.setNumBytes(totalNumBytes); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/011fcd92/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HFileAppender.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HFileAppender.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HFileAppender.java index 42f25cc..228d5a4 100644 --- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HFileAppender.java +++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HFileAppender.java @@ -104,7 +104,7 @@ public class HFileAppender extends AbstractHBaseAppender { } kvSet.clear(); // Statistical section - if (enabledStats) { + if (tableStatsEnabled) { stats.incrementRow(); } } catch (InterruptedException e) { @@ -145,7 +145,7 @@ public class HFileAppender extends AbstractHBaseAppender { } kvSet.clear(); // Statistical section - if (enabledStats) { + if (tableStatsEnabled) { stats.incrementRow(); } } catch (InterruptedException e) { @@ -153,7 +153,7 @@ public class HFileAppender extends AbstractHBaseAppender { } } - if (enabledStats) { + if (tableStatsEnabled) { stats.setNumBytes(totalNumBytes); } if (writer != null) { http://git-wip-us.apache.org/repos/asf/tajo/blob/011fcd92/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java index c6a690b..568df8c 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java @@ -18,20 +18,19 @@ package org.apache.tajo.storage; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.tajo.TaskAttemptId; +import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.exception.NotImplementedException; import java.io.IOException; +import java.util.List; public abstract class FileAppender implements Appender { - private static final Log LOG = LogFactory.getLog(FileAppender.class); - protected boolean inited = false; protected final Configuration conf; @@ -40,7 +39,8 @@ public abstract class FileAppender implements Appender { protected final Path workDir; protected final TaskAttemptId taskAttemptId; - protected boolean enabledStats; + protected boolean tableStatsEnabled; + protected boolean[] columnStatsEnabled; protected Path path; public FileAppender(Configuration conf, TaskAttemptId taskAttemptId, Schema schema, @@ -65,6 +65,7 @@ public abstract class FileAppender implements Appender { } } + @Override public void init() throws IOException { if (inited) { throw new IllegalStateException("FileAppender is already initialized."); @@ -72,17 +73,28 @@ public abstract class FileAppender implements Appender { inited = true; } + @Override public void enableStats() { if (inited) { throw new IllegalStateException("Should enable this option before init()"); } - this.enabledStats = true; + this.tableStatsEnabled = true; + this.columnStatsEnabled = new boolean[schema.size()]; + } + + @Override + public void enableStats(List<Column> columnList) { + enableStats(); + columnList.forEach(column -> columnStatsEnabled[schema.getIndex(column)] = true); } + @Override public long getEstimatedOutputSize() throws IOException { return getOffset(); } - public abstract long getOffset() throws IOException; + public long getOffset() throws IOException { + throw new IOException(new NotImplementedException()); + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/011fcd92/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java deleted file mode 100644 index a82c7ec..0000000 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java +++ /dev/null @@ -1,209 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.tajo.ExecutionBlockId; -import org.apache.tajo.TaskAttemptId; -import org.apache.tajo.catalog.statistics.TableStats; -import org.apache.tajo.util.Pair; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; - -public class HashShuffleAppender implements Appender { - private static Log LOG = LogFactory.getLog(HashShuffleAppender.class); - - private FileAppender appender; - private AtomicBoolean closed = new AtomicBoolean(false); - private int partId; - - private TableStats tableStats; - - //<taskId,<page start offset,<task start, task end>>> - private Map<TaskAttemptId, List<Pair<Long, Pair<Integer, Integer>>>> taskTupleIndexes; - - //page start offset, length - private List<Pair<Long, Integer>> pages = new ArrayList<>(); - - private Pair<Long, Integer> currentPage; - - private int pageSize; //MB - - private int rowNumInPage; - - private int totalRows; - - private long offset; - - private ExecutionBlockId ebId; - - public HashShuffleAppender(ExecutionBlockId ebId, int partId, int pageSize, FileAppender appender) { - this.ebId = ebId; - this.partId = partId; - this.appender = appender; - this.pageSize = pageSize; - } - - @Override - public void init() throws IOException { - currentPage = new Pair(0L, 0); - taskTupleIndexes = new HashMap<>(); - rowNumInPage = 0; - } - - /** - * Write multiple tuples. Each tuple is written by a FileAppender which is responsible specified partition. - * After writing if a current page exceeds pageSize, pageOffset will be added. - * @param taskId - * @param tuples - * @return written bytes - * @throws java.io.IOException - */ - public int addTuples(TaskAttemptId taskId, List<Tuple> tuples) throws IOException { - synchronized(appender) { - if (closed.get()) { - return 0; - } - long currentPos = appender.getOffset(); - - for (Tuple eachTuple: tuples) { - appender.addTuple(eachTuple); - } - long posAfterWritten = appender.getOffset(); - - int writtenBytes = (int)(posAfterWritten - currentPos); - - int nextRowNum = rowNumInPage + tuples.size(); - List<Pair<Long, Pair<Integer, Integer>>> taskIndexes = taskTupleIndexes.get(taskId); - if (taskIndexes == null) { - taskIndexes = new ArrayList<>(); - taskTupleIndexes.put(taskId, taskIndexes); - } - taskIndexes.add( - new Pair<>(currentPage.getFirst(), new Pair(rowNumInPage, nextRowNum))); - rowNumInPage = nextRowNum; - - if (posAfterWritten - currentPage.getFirst() > pageSize) { - nextPage(posAfterWritten); - rowNumInPage = 0; - } - - totalRows += tuples.size(); - return writtenBytes; - } - } - - public long getOffset() throws IOException { - if (closed.get()) { - return offset; - } else { - return appender.getOffset(); - } - } - - private void nextPage(long pos) { - currentPage.setSecond((int) (pos - currentPage.getFirst())); - pages.add(currentPage); - currentPage = new Pair(pos, 0); - } - - @Override - public void addTuple(Tuple t) throws IOException { - throw new IOException("Not support addTuple, use addTuples()"); - } - - @Override - public void flush() throws IOException { - synchronized(appender) { - if (closed.get()) { - return; - } - appender.flush(); - } - } - - @Override - public long getEstimatedOutputSize() throws IOException { - return pageSize * pages.size(); - } - - @Override - public void close() throws IOException { - synchronized(appender) { - if (closed.get()) { - return; - } - appender.flush(); - offset = appender.getOffset(); - if (offset > currentPage.getFirst()) { - nextPage(offset); - } - appender.close(); - if (LOG.isDebugEnabled()) { - if (!pages.isEmpty()) { - LOG.info(ebId + ",partId=" + partId + " Appender closed: fileLen=" + offset + ", pages=" + pages.size() - + ", lastPage=" + pages.get(pages.size() - 1)); - } else { - LOG.info(ebId + ",partId=" + partId + " Appender closed: fileLen=" + offset + ", pages=" + pages.size()); - } - } - closed.set(true); - tableStats = appender.getStats(); - } - } - - @Override - public void enableStats() { - } - - @Override - public TableStats getStats() { - synchronized(appender) { - return appender.getStats(); - } - } - - public List<Pair<Long, Integer>> getPages() { - return pages; - } - - public Map<TaskAttemptId, List<Pair<Long, Pair<Integer, Integer>>>> getTaskTupleIndexes() { - return taskTupleIndexes; - } - - public List<Pair<Long, Pair<Integer, Integer>>> getMergedTupleIndexes() { - List<Pair<Long, Pair<Integer, Integer>>> merged = new ArrayList<>(); - - for (List<Pair<Long, Pair<Integer, Integer>>> eachFailureIndex: taskTupleIndexes.values()) { - merged.addAll(eachFailureIndex); - } - - return merged; - } - - public void taskFinished(TaskAttemptId taskId) { - taskTupleIndexes.remove(taskId); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/011fcd92/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java index 4297e4d..62df9a3 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java @@ -62,8 +62,8 @@ public class HashShuffleAppenderManager { pageSize = systemConf.getIntVar(ConfVars.SHUFFLE_HASH_APPENDER_PAGE_VOLUME) * 1024 * 1024; } - public HashShuffleAppender getAppender(TajoConf tajoConf, ExecutionBlockId ebId, int partId, - TableMeta meta, Schema outSchema) throws IOException { + public HashShuffleAppenderWrapper getAppender(TajoConf tajoConf, ExecutionBlockId ebId, int partId, + TableMeta meta, Schema outSchema) throws IOException { synchronized (appenderMap) { Map<Integer, PartitionAppenderMeta> partitionAppenderMap = appenderMap.get(ebId); @@ -93,7 +93,7 @@ public class HashShuffleAppenderManager { partitionAppenderMeta = new PartitionAppenderMeta(); partitionAppenderMeta.partId = partId; partitionAppenderMeta.dataFile = dataFile; - partitionAppenderMeta.appender = new HashShuffleAppender(ebId, partId, pageSize, appender); + partitionAppenderMeta.appender = new HashShuffleAppenderWrapper(ebId, partId, pageSize, appender); partitionAppenderMeta.appender.init(); partitionAppenderMap.put(partId, partitionAppenderMeta); @@ -132,7 +132,7 @@ public class HashShuffleAppenderManager { } if (partitionAppenderMap == null) { - LOG.info("Close HashShuffleAppender:" + ebId + ", not a hash shuffle"); + LOG.info("Close HashShuffleAppenderWrapper:" + ebId + ", not a hash shuffle"); return null; } @@ -152,7 +152,7 @@ public class HashShuffleAppenderManager { } } - LOG.info("Close HashShuffleAppender:" + ebId + ", intermediates=" + intermediateEntries.size()); + LOG.info("Close HashShuffleAppenderWrapper:" + ebId + ", intermediates=" + intermediateEntries.size()); return intermediateEntries; } @@ -210,14 +210,14 @@ public class HashShuffleAppenderManager { static class PartitionAppenderMeta { int partId; - HashShuffleAppender appender; + HashShuffleAppenderWrapper appender; Path dataFile; public int getPartId() { return partId; } - public HashShuffleAppender getAppender() { + public HashShuffleAppenderWrapper getAppender() { return appender; } http://git-wip-us.apache.org/repos/asf/tajo/blob/011fcd92/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderWrapper.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderWrapper.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderWrapper.java new file mode 100644 index 0000000..bcd4388 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderWrapper.java @@ -0,0 +1,187 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tajo.ExecutionBlockId; +import org.apache.tajo.TaskAttemptId; +import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.util.Pair; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +public class HashShuffleAppenderWrapper implements Closeable { + private static Log LOG = LogFactory.getLog(HashShuffleAppenderWrapper.class); + + private FileAppender appender; + private AtomicBoolean closed = new AtomicBoolean(false); + private int partId; + + //<taskId,<page start offset,<task start, task end>>> + private Map<TaskAttemptId, List<Pair<Long, Pair<Integer, Integer>>>> taskTupleIndexes; + + //page start offset, length + private List<Pair<Long, Integer>> pages = new ArrayList<>(); + + private Pair<Long, Integer> currentPage; + + private int pageSize; //MB + + private int rowNumInPage; + + private long offset; + + private ExecutionBlockId ebId; + + public HashShuffleAppenderWrapper(ExecutionBlockId ebId, int partId, int pageSize, FileAppender appender) { + this.ebId = ebId; + this.partId = partId; + this.appender = appender; + this.pageSize = pageSize; + } + + public void init() throws IOException { + currentPage = new Pair(0L, 0); + taskTupleIndexes = new HashMap<>(); + rowNumInPage = 0; + } + + /** + * Write multiple tuples. Each tuple is written by a FileAppender which is responsible specified partition. + * After writing if a current page exceeds pageSize, pageOffset will be added. + * @param taskId + * @param tuples + * @return written bytes + * @throws java.io.IOException + */ + public int addTuples(TaskAttemptId taskId, List<Tuple> tuples) throws IOException { + synchronized(appender) { + if (closed.get()) { + return 0; + } + long currentPos = appender.getOffset(); + + for (Tuple eachTuple: tuples) { + appender.addTuple(eachTuple); + } + long posAfterWritten = appender.getOffset(); + + int writtenBytes = (int)(posAfterWritten - currentPos); + + int nextRowNum = rowNumInPage + tuples.size(); + List<Pair<Long, Pair<Integer, Integer>>> taskIndexes = taskTupleIndexes.get(taskId); + if (taskIndexes == null) { + taskIndexes = new ArrayList<>(); + taskTupleIndexes.put(taskId, taskIndexes); + } + taskIndexes.add( + new Pair<>(currentPage.getFirst(), new Pair(rowNumInPage, nextRowNum))); + rowNumInPage = nextRowNum; + + if (posAfterWritten - currentPage.getFirst() > pageSize) { + nextPage(posAfterWritten); + rowNumInPage = 0; + } + + return writtenBytes; + } + } + + public long getOffset() throws IOException { + if (closed.get()) { + return offset; + } else { + return appender.getOffset(); + } + } + + private void nextPage(long pos) { + currentPage.setSecond((int) (pos - currentPage.getFirst())); + pages.add(currentPage); + currentPage = new Pair(pos, 0); + } + + public void addTuple(Tuple t) throws IOException { + throw new IOException("Not support addTuple, use addTuples()"); + } + + public void flush() throws IOException { + synchronized(appender) { + if (closed.get()) { + return; + } + appender.flush(); + } + } + + @Override + public void close() throws IOException { + synchronized(appender) { + if (closed.get()) { + return; + } + appender.flush(); + offset = appender.getOffset(); + if (offset > currentPage.getFirst()) { + nextPage(offset); + } + appender.close(); + if (LOG.isDebugEnabled()) { + if (!pages.isEmpty()) { + LOG.info(ebId + ",partId=" + partId + " Appender closed: fileLen=" + offset + ", pages=" + pages.size() + + ", lastPage=" + pages.get(pages.size() - 1)); + } else { + LOG.info(ebId + ",partId=" + partId + " Appender closed: fileLen=" + offset + ", pages=" + pages.size()); + } + } + closed.set(true); + } + } + + public TableStats getStats() { + synchronized(appender) { + return appender.getStats(); + } + } + + public List<Pair<Long, Integer>> getPages() { + return pages; + } + + public List<Pair<Long, Pair<Integer, Integer>>> getMergedTupleIndexes() { + List<Pair<Long, Pair<Integer, Integer>>> merged = new ArrayList<>(); + + for (List<Pair<Long, Pair<Integer, Integer>>> eachFailureIndex: taskTupleIndexes.values()) { + merged.addAll(eachFailureIndex); + } + + return merged; + } + + public void taskFinished(TaskAttemptId taskId) { + taskTupleIndexes.remove(taskId); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/011fcd92/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java index a7b33fa..cda39f9 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java @@ -35,8 +35,6 @@ import org.apache.tajo.datum.ProtobufDatumFactory; import org.apache.tajo.exception.TajoRuntimeException; import org.apache.tajo.exception.UnsupportedException; import org.apache.tajo.plan.expr.EvalNode; -import org.apache.tajo.plan.serder.PlanProto.ShuffleType; -import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.storage.fragment.Fragment; import org.apache.tajo.unit.StorageUnit; import org.apache.tajo.util.BitArray; @@ -474,7 +472,6 @@ public class RawFile { private int headerSize = 0; private static final int RECORD_SIZE = 4; private long pos; - private ShuffleType shuffleType; private TableStatistics stats; @@ -512,11 +509,8 @@ public class RawFile { nullFlags = new BitArray(schema.size()); headerSize = RECORD_SIZE + 2 + nullFlags.bytesLength(); - if (enabledStats) { - this.stats = new TableStatistics(this.schema); - this.shuffleType = PlannerUtil.getShuffleType( - meta.getOption(StorageConstants.SHUFFLE_TYPE, - PlannerUtil.getShuffleType(ShuffleType.NONE_SHUFFLE))); + if (tableStatsEnabled) { + this.stats = new TableStatistics(this.schema, columnStatsEnabled); } super.init(); @@ -646,8 +640,8 @@ public class RawFile { // reset the null flags nullFlags.clear(); for (int i = 0; i < schema.size(); i++) { - if (shuffleType == ShuffleType.RANGE_SHUFFLE) { - // it is to calculate min/max values, and it is only used for the intermediate file. + // it is to calculate min/max values, and it is only used for the intermediate file. + if (tableStatsEnabled) { stats.analyzeField(i, t); } @@ -751,7 +745,7 @@ public class RawFile { pos += bufferPos - recordOffset; buffer.position(bufferPos); - if (enabledStats) { + if (tableStatsEnabled) { stats.incrementRow(); } } @@ -766,7 +760,7 @@ public class RawFile { @Override public void close() throws IOException { flush(); - if (enabledStats) { + if (tableStatsEnabled) { stats.setNumBytes(getOffset()); } if (LOG.isDebugEnabled()) { @@ -786,7 +780,7 @@ public class RawFile { @Override public TableStats getStats() { - if (enabledStats) { + if (tableStatsEnabled) { stats.setNumBytes(pos); return stats.getTableStat(); } else { http://git-wip-us.apache.org/repos/asf/tajo/blob/011fcd92/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java index 45206f5..87a112e 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java @@ -366,8 +366,8 @@ public class RowFile { nullFlags = new BitArray(schema.size()); - if (enabledStats) { - this.stats = new TableStatistics(this.schema); + if (tableStatsEnabled) { + this.stats = new TableStatistics(this.schema, columnStatsEnabled); this.shuffleType = PlannerUtil.getShuffleType( meta.getOption(StorageConstants.SHUFFLE_TYPE, PlannerUtil.getShuffleType(ShuffleType.NONE_SHUFFLE))); @@ -462,7 +462,7 @@ public class RowFile { out.write(bytes, 0, dataLen); // Statistical section - if (enabledStats) { + if (tableStatsEnabled) { stats.incrementRow(); } } @@ -480,7 +480,7 @@ public class RowFile { @Override public void close() throws IOException { if (out != null) { - if (enabledStats) { + if (tableStatsEnabled) { stats.setNumBytes(out.getPos()); } sync(); @@ -505,7 +505,7 @@ public class RowFile { @Override public TableStats getStats() { - if (enabledStats) { + if (tableStatsEnabled) { return stats.getTableStat(); } else { return null; http://git-wip-us.apache.org/repos/asf/tajo/blob/011fcd92/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java index e54fb80..29f4534 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java @@ -81,23 +81,12 @@ public class AvroAppender extends FileAppender { dataFileWriter = new DataFileWriter<>(datumWriter); dataFileWriter.create(avroSchema, outputStream); - if (enabledStats) { - this.stats = new TableStatistics(schema); + if (tableStatsEnabled) { + this.stats = new TableStatistics(schema, columnStatsEnabled); } super.init(); } - /** - * Gets the current offset. Tracking offsets is currently not implemented, so - * this method always returns 0. - * - * @return 0 - */ - @Override - public long getOffset() throws IOException { - return 0; - } - private Object getPrimitive(Tuple tuple, int i, Schema.Type avroType) { if (tuple.isBlankOrNull(i)) { return null; @@ -173,10 +162,13 @@ public class AvroAppender extends FileAppender { throw new RuntimeException("Unknown type: " + avroType); } record.put(i, value); + if (tableStatsEnabled) { + stats.analyzeField(i, tuple); + } } dataFileWriter.append(record); - if (enabledStats) { + if (tableStatsEnabled) { stats.incrementRow(); } } @@ -195,6 +187,11 @@ public class AvroAppender extends FileAppender { @Override public void close() throws IOException { IOUtils.cleanup(null, dataFileWriter); + + // TODO: getOffset is not implemented yet +// if (tableStatsEnabled) { +// stats.setNumBytes(getOffset()); +// } } /** @@ -204,7 +201,7 @@ public class AvroAppender extends FileAppender { */ @Override public TableStats getStats() { - if (enabledStats) { + if (tableStatsEnabled) { return stats.getTableStat(); } else { return null; http://git-wip-us.apache.org/repos/asf/tajo/blob/011fcd92/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/ORCAppender.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/ORCAppender.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/ORCAppender.java index dbbf5a6..7d00206 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/ORCAppender.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/ORCAppender.java @@ -62,27 +62,22 @@ public class ORCAppender extends FileAppender { StorageConstants.DEFAULT_ORC_ROW_INDEX_STRIDE)), timezone); - if (enabledStats) { - this.stats = new TableStatistics(schema); + if (tableStatsEnabled) { + this.stats = new TableStatistics(schema, columnStatsEnabled); } super.init(); } @Override - public long getOffset() throws IOException { - return 0; - } - - @Override public void addTuple(Tuple tuple) throws IOException { - if (enabledStats) { + if (tableStatsEnabled) { for (int i = 0; i < schema.size(); ++i) { stats.analyzeField(i, tuple); } } writer.addTuple(tuple); - if (enabledStats) { + if (tableStatsEnabled) { stats.incrementRow(); } } @@ -94,11 +89,16 @@ public class ORCAppender extends FileAppender { @Override public void close() throws IOException { writer.close(); + + // TODO: getOffset is not implemented yet +// if (tableStatsEnabled) { +// stats.setNumBytes(getOffset()); +// } } @Override public TableStats getStats() { - if (enabledStats) { + if (tableStatsEnabled) { return stats.getTableStat(); } else { return null; http://git-wip-us.apache.org/repos/asf/tajo/blob/011fcd92/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java index 41e4269..07c9d0e 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java @@ -18,20 +18,19 @@ package org.apache.tajo.storage.parquet; -import org.apache.hadoop.io.IOUtils; -import org.apache.tajo.TaskAttemptId; -import org.apache.tajo.storage.StorageConstants; -import parquet.hadoop.ParquetOutputFormat; -import parquet.hadoop.metadata.CompressionCodecName; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IOUtils; +import org.apache.tajo.TaskAttemptId; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.storage.FileAppender; +import org.apache.tajo.storage.StorageConstants; import org.apache.tajo.storage.TableStatistics; import org.apache.tajo.storage.Tuple; +import parquet.hadoop.ParquetOutputFormat; +import parquet.hadoop.metadata.CompressionCodecName; import java.io.IOException; @@ -82,24 +81,13 @@ public class ParquetAppender extends FileAppender { pageSize, enableDictionary, validating); - if (enabledStats) { - this.stats = new TableStatistics(schema); + if (tableStatsEnabled) { + this.stats = new TableStatistics(schema, columnStatsEnabled); } super.init(); } /** - * Gets the current offset. Tracking offsets is currently not implemented, so - * this method always returns 0. - * - * @return 0 - */ - @Override - public long getOffset() throws IOException { - return 0; - } - - /** * Write a Tuple to the Parquet file. * * @param tuple The Tuple to write. @@ -107,7 +95,7 @@ public class ParquetAppender extends FileAppender { @Override public void addTuple(Tuple tuple) throws IOException { writer.write(tuple); - if (enabledStats) { + if (tableStatsEnabled) { stats.incrementRow(); } } @@ -125,6 +113,11 @@ public class ParquetAppender extends FileAppender { @Override public void close() throws IOException { IOUtils.cleanup(null, writer); + + // TODO: getOffset is not implemented yet +// if (tableStatsEnabled) { +// stats.setNumBytes(getOffset()); +// } } public long getEstimatedOutputSize() throws IOException { @@ -138,7 +131,7 @@ public class ParquetAppender extends FileAppender { */ @Override public TableStats getStats() { - if (enabledStats) { + if (tableStatsEnabled) { return stats.getTableStat(); } else { return null; http://git-wip-us.apache.org/repos/asf/tajo/blob/011fcd92/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileWriter.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileWriter.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileWriter.java index 03642a7..23ef059 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileWriter.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileWriter.java @@ -89,8 +89,8 @@ public class DirectRawFileWriter extends FileAppender { isLocal = false; } - if (enabledStats) { - this.stats = new TableStatistics(this.schema); + if (tableStatsEnabled) { + this.stats = new TableStatistics(this.schema, columnStatsEnabled); this.shuffleType = PlannerUtil.getShuffleType( meta.getOption(StorageConstants.SHUFFLE_TYPE, PlannerUtil.getShuffleType(ShuffleType.NONE_SHUFFLE))); @@ -115,7 +115,7 @@ public class DirectRawFileWriter extends FileAppender { rowBlock.getMemory().clear(); - if (enabledStats) { + if (tableStatsEnabled) { stats.incrementRows(rowBlock.rows() - stats.getNumRows()); } } @@ -147,7 +147,7 @@ public class DirectRawFileWriter extends FileAppender { public void close() throws IOException { flush(); - if (enabledStats) { + if (tableStatsEnabled) { stats.setNumBytes(getOffset()); } if (LOG.isDebugEnabled()) { @@ -160,7 +160,7 @@ public class DirectRawFileWriter extends FileAppender { @Override public TableStats getStats() { - if (enabledStats) { + if (tableStatsEnabled) { stats.setNumBytes(pos); return stats.getTableStat(); } else { http://git-wip-us.apache.org/repos/asf/tajo/blob/011fcd92/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java index ed55506..20519b7 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java @@ -601,7 +601,6 @@ public class RCFile { boolean useNewMagic = true; private byte[] nullChars; private SerializerDeserializer serde; - private boolean isShuffle; // Insert a globally unique 16-byte value every few entries, so that one // can seek into the middle of a file and then synchronize with record @@ -774,8 +773,8 @@ public class RCFile { writeFileHeader(); finalizeFileHeader(); - if (enabledStats) { - this.stats = new TableStatistics(this.schema); + if (tableStatsEnabled) { + this.stats = new TableStatistics(this.schema, columnStatsEnabled); } super.init(); } @@ -866,7 +865,7 @@ public class RCFile { append(t); // Statistical section - if (enabledStats) { + if (tableStatsEnabled) { stats.incrementRow(); } } @@ -882,22 +881,14 @@ public class RCFile { * @throws java.io.IOException */ public void append(Tuple tuple) throws IOException { - int size = schema.size(); - - for (int i = 0; i < size; i++) { + for (int i = 0; i < columnNumber; i++) { + if (tableStatsEnabled) { + stats.analyzeField(i, tuple); + } int length = columnBuffers[i].append(tuple, i); columnBufferSize += length; } - if (size < columnNumber) { - for (int i = size; i < columnNumber; i++) { - columnBuffers[i].append(NullDatum.get()); - if (isShuffle) { - stats.analyzeNull(i); - } - } - } - bufferedRecords++; //TODO compression rate base flush if ((columnBufferSize > COLUMNS_BUFFER_SIZE) @@ -1077,7 +1068,7 @@ public class RCFile { @Override public TableStats getStats() { - if (enabledStats) { + if (tableStatsEnabled) { return stats.getTableStat(); } else { return null; @@ -1093,7 +1084,7 @@ public class RCFile { if (out != null) { // Statistical section - if (enabledStats) { + if (tableStatsEnabled) { stats.setNumBytes(getOffset()); } // Close the underlying stream if we own it... http://git-wip-us.apache.org/repos/asf/tajo/blob/011fcd92/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java index b1a14e3..8e0a88c 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java @@ -134,8 +134,8 @@ public class SequenceFileAppender extends FileAppender { writer = SequenceFile.createWriter(fs, conf, path, keyClass, valueClass, CompressionType.NONE, codec); } - if (enabledStats) { - this.stats = new TableStatistics(this.schema); + if (tableStatsEnabled) { + this.stats = new TableStatistics(this.schema, columnStatsEnabled); } super.init(); @@ -203,7 +203,7 @@ public class SequenceFileAppender extends FileAppender { pos += writer.getLength(); rowCount++; - if (enabledStats) { + if (tableStatsEnabled) { stats.incrementRow(); } } @@ -221,7 +221,7 @@ public class SequenceFileAppender extends FileAppender { @Override public void close() throws IOException { // Statistical section - if (enabledStats) { + if (tableStatsEnabled) { stats.setNumBytes(getOffset()); } @@ -230,7 +230,7 @@ public class SequenceFileAppender extends FileAppender { @Override public TableStats getStats() { - if (enabledStats) { + if (tableStatsEnabled) { return stats.getTableStat(); } else { return null; http://git-wip-us.apache.org/repos/asf/tajo/blob/011fcd92/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java index c0ee784..53fbd57 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java @@ -131,8 +131,8 @@ public class DelimitedTextFile { @Override public void init() throws IOException { - if (enabledStats) { - this.stats = new TableStatistics(this.schema); + if (tableStatsEnabled) { + this.stats = new TableStatistics(this.schema, columnStatsEnabled); } if(serializer != null) { @@ -189,7 +189,7 @@ public class DelimitedTextFile { flushBuffer(); } // Statistical section - if (enabledStats) { + if (tableStatsEnabled) { stats.incrementRow(); } } @@ -226,7 +226,7 @@ public class DelimitedTextFile { flush(); // Statistical section - if (enabledStats) { + if (tableStatsEnabled) { stats.setNumBytes(getOffset()); } @@ -246,7 +246,7 @@ public class DelimitedTextFile { @Override public TableStats getStats() { - if (enabledStats) { + if (tableStatsEnabled) { return stats.getTableStat(); } else { return null; http://git-wip-us.apache.org/repos/asf/tajo/blob/011fcd92/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java index cc69119..9c30202 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java @@ -161,8 +161,7 @@ public class TestCompressionStorages { } int tupleCnt = 0; - Tuple tuple; - while ((tuple = scanner.next()) != null) { + while ((scanner.next()) != null) { tupleCnt++; } scanner.close();
