PARQUET-1025: Support new min-max statistics in parquet-mr Author: Gabor Szadovszky <gabor.szadovs...@cloudera.com>
Closes #435 from gszadovszky/PARQUET-1025 and squashes the following commits: 2a63fcf13 [Gabor Szadovszky] PARQUET-1025: Use constant instead of creating new TypeDefinedOrder instances 820df6fb7 [Gabor Szadovszky] PARQUET-1025: Minor fixes at data generation for TestStatistics dc838f273 [Gabor Szadovszky] PARQUET-1025: Implement ColumnOrder; other updates for rdblue's findings 524750be0 [Gabor Szadovszky] PARQUET-1025: Some updates for zi's findings a2ae97ce5 [Gabor Szadovszky] PARQUET-1025: Unified formatting/comments/deprecation bc86e8a63 [Gabor Szadovszky] PARQUET-1025: Updates according to rdblue's comments 70e56a759 [Gabor Szadovszky] PARQUET-1025: Add explicit list of types to not to read/write statistics 95199e5e0 [Gabor Szadovszky] PARQUET-1025: Use lexicographical comparison for Binary.compareTo Also rename SIGNED_BINARY_COMPARATOR to a more descriptive name Also added comments for haxa representation of values at unsigned comparison testing 2f28c2c0e [Gabor Szadovszky] PARQUET-1025: Finalize read/write stats updates c5536a0a3 [Gabor Szadovszky] PARQUET-1025: Some modifications according to zi's comments 318e585d9 [Gabor Szadovszky] PARQUET-1025: Finalize reading/writing new stats; modify/implement unit tests accordingly 688ef2efe [Gabor Szadovszky] PARQUET-1025: Updates according to zi's and rdblue's comments 51bc1f827 [Gabor Szadovszky] PARQUET-1025: Add the proper comparators as required; revert Binary related changes 20b937f46 [Gabor Szadovszky] PARQUET-1025: reading/writing new min-max statistics; use the comparators as needed 52cd58f61 [Gabor Szadovszky] PARQUET-1025: Move comparators to Type 3378b6d34 [Gabor Szadovszky] PARQUET-1025: Implement comparators and use them with statistics e1719bb3b [Gabor Szadovszky] PARQUET-1025: Refactor Binary to prepare from custom comparators Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/c6764c4a Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/c6764c4a Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/c6764c4a Branch: refs/heads/master Commit: c6764c4a0848abf1d581e22df8b33e28ee9f2ced Parents: 4d996d1 Author: Gabor Szadovszky <gabor.szadovs...@cloudera.com> Authored: Fri Jan 12 16:29:48 2018 -0800 Committer: Ryan Blue <b...@apache.org> Committed: Fri Jan 12 16:29:48 2018 -0800 ---------------------------------------------------------------------- .../main/java/org/apache/parquet/cli/Util.java | 50 +-- .../cli/commands/CheckParquet251Command.java | 11 +- .../apache/parquet/column/ColumnDescriptor.java | 40 ++- .../parquet/column/impl/ColumnWriterV1.java | 2 +- .../parquet/column/impl/ColumnWriterV2.java | 2 +- .../column/statistics/BinaryStatistics.java | 49 ++- .../column/statistics/BooleanStatistics.java | 53 ++- .../column/statistics/DoubleStatistics.java | 56 ++- .../column/statistics/FloatStatistics.java | 57 ++- .../column/statistics/IntStatistics.java | 57 ++- .../column/statistics/LongStatistics.java | 57 ++- .../parquet/column/statistics/Statistics.java | 195 +++++++++-- .../statistics/StatisticsClassException.java | 14 +- .../parquet/filter2/predicate/Statistics.java | 36 ++ ...ntallyUpdatedFilterPredicateBuilderBase.java | 19 + .../org/apache/parquet/io/MessageColumnIO.java | 2 +- .../apache/parquet/io/PrimitiveColumnIO.java | 7 +- .../java/org/apache/parquet/io/api/Binary.java | 120 +------ .../org/apache/parquet/schema/ColumnOrder.java | 97 +++++ .../org/apache/parquet/schema/MessageType.java | 6 +- .../parquet/schema/PrimitiveComparator.java | 290 +++++++++++++++ .../apache/parquet/schema/PrimitiveType.java | 187 +++++++++- .../java/org/apache/parquet/schema/Types.java | 20 +- .../column/statistics/TestStatistics.java | 28 ++ .../org/apache/parquet/io/api/TestBinary.java | 20 ++ .../apache/parquet/schema/TestMessageType.java | 45 +++ .../parquet/schema/TestPrimitiveComparator.java | 311 ++++++++++++++++ .../apache/parquet/schema/TestTypeBuilders.java | 47 +++ ...mentallyUpdatedFilterPredicateGenerator.java | 43 +-- .../dictionarylevel/DictionaryFilter.java | 15 +- .../statisticslevel/StatisticsFilter.java | 17 +- .../converter/ParquetMetadataConverter.java | 137 ++++++-- .../hadoop/ColumnChunkPageWriteStore.java | 20 +- .../parquet/hadoop/ParquetFileWriter.java | 22 +- .../hadoop/metadata/ColumnChunkMetaData.java | 39 ++- .../hadoop/metadata/ColumnChunkProperties.java | 30 +- .../converter/TestParquetMetadataConverter.java | 351 +++++++++++++++++-- .../parquet/hadoop/TestParquetFileWriter.java | 8 +- .../org/apache/parquet/hadoop/TestUtils.java | 21 ++ .../apache/parquet/statistics/RandomValues.java | 97 ++++- .../parquet/statistics/TestStatistics.java | 196 +++++++---- .../thrift/TestThriftToParquetFileWriter.java | 23 +- 42 files changed, 2398 insertions(+), 499 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/c6764c4a/parquet-cli/src/main/java/org/apache/parquet/cli/Util.java ---------------------------------------------------------------------- diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/Util.java b/parquet-cli/src/main/java/org/apache/parquet/cli/Util.java index 07a5364..04b3901 100644 --- a/parquet-cli/src/main/java/org/apache/parquet/cli/Util.java +++ b/parquet-cli/src/main/java/org/apache/parquet/cli/Util.java @@ -29,10 +29,6 @@ import org.apache.parquet.column.Encoding; import org.apache.parquet.column.EncodingStats; import org.apache.parquet.column.statistics.BinaryStatistics; import org.apache.parquet.column.statistics.BooleanStatistics; -import org.apache.parquet.column.statistics.DoubleStatistics; -import org.apache.parquet.column.statistics.FloatStatistics; -import org.apache.parquet.column.statistics.IntStatistics; -import org.apache.parquet.column.statistics.LongStatistics; import org.apache.parquet.column.statistics.Statistics; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.schema.MessageType; @@ -40,7 +36,6 @@ import org.apache.parquet.schema.OriginalType; import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Type; import java.nio.charset.StandardCharsets; -import java.util.Locale; import java.util.Set; import static org.apache.parquet.column.Encoding.BIT_PACKED; @@ -96,34 +91,14 @@ public class Util { return ""; } // TODO: use original types when showing decimal, timestamp, etc. - if (stats instanceof BooleanStatistics) { - return String.format("%s / %s", - ((BooleanStatistics) stats).getMin(), - ((BooleanStatistics) stats).getMax()); - } else if (stats instanceof IntStatistics) { - return String.format("%d / %d", - ((IntStatistics) stats).getMin(), - ((IntStatistics) stats).getMax()); - } else if (stats instanceof LongStatistics) { - return String.format("%d / %d", - ((LongStatistics) stats).getMin(), - ((LongStatistics) stats).getMax()); - } else if (stats instanceof FloatStatistics) { - return String.format("%f / %f", - ((FloatStatistics) stats).getMin(), - ((FloatStatistics) stats).getMax()); - } else if (stats instanceof DoubleStatistics) { - return String.format("%f / %f", - ((DoubleStatistics) stats).getMin(), - ((DoubleStatistics) stats).getMax()); - } else if (stats instanceof BinaryStatistics) { + if (stats instanceof BinaryStatistics) { byte[] minBytes = stats.getMinBytes(); byte[] maxBytes = stats.getMaxBytes(); return String.format("%s / %s", printable(minBytes, annotation == OriginalType.UTF8, 30), printable(maxBytes, annotation == OriginalType.UTF8, 30)); } else { - throw new RuntimeException("Unknown stats type: " + stats); + return String.format("%s / %s", stats.minAsString(), stats.maxAsString()); } } @@ -134,24 +109,6 @@ public class Util { // TODO: use original types when showing decimal, timestamp, etc. if (stats instanceof BooleanStatistics) { return String.format("nulls: %d/%d", stats.getNumNulls(), count); - } else if (stats instanceof IntStatistics) { - return String.format("min: %d max: %d nulls: %d/%d", - ((IntStatistics) stats).getMin(), ((IntStatistics) stats).getMax(), - stats.getNumNulls(), count); - } else if (stats instanceof LongStatistics) { - return String.format("min: %d max: %d nulls: %d/%d", - ((LongStatistics) stats).getMin(), ((LongStatistics) stats).getMax(), - stats.getNumNulls(), count); - } else if (stats instanceof FloatStatistics) { - return String.format("min: %f max: %f nulls: %d/%d", - ((FloatStatistics) stats).getMin(), - ((FloatStatistics) stats).getMax(), - stats.getNumNulls(), count); - } else if (stats instanceof DoubleStatistics) { - return String.format("min: %f max: %f nulls: %d/%d", - ((DoubleStatistics) stats).getMin(), - ((DoubleStatistics) stats).getMax(), - stats.getNumNulls(), count); } else if (stats instanceof BinaryStatistics) { byte[] minBytes = stats.getMinBytes(); byte[] maxBytes = stats.getMaxBytes(); @@ -160,7 +117,8 @@ public class Util { printable(maxBytes, annotation == OriginalType.UTF8, 30), stats.getNumNulls(), count); } else { - throw new RuntimeException("Unknown stats type: " + stats); + return String.format("min: %s max: %s nulls: %d/%d", + stats.minAsString(), stats.maxAsString(), stats.getNumNulls(), count); } } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/c6764c4a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/CheckParquet251Command.java ---------------------------------------------------------------------- diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/CheckParquet251Command.java b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/CheckParquet251Command.java index 8f60821..fbeebdf 100644 --- a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/CheckParquet251Command.java +++ b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/CheckParquet251Command.java @@ -53,6 +53,7 @@ import org.slf4j.Logger; import javax.annotation.Nullable; import java.io.IOException; import java.util.Arrays; +import java.util.Comparator; import java.util.List; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; @@ -184,9 +185,11 @@ public class CheckParquet251Command extends BaseCommand { private final boolean hasNonNull; private final T min; private final T max; + private final Comparator<T> comparator; public StatsValidator(DataPage page) { Statistics<T> stats = getStatisticsFromPageHeader(page); + this.comparator = stats.comparator(); this.hasNonNull = stats.hasNonNullValue(); if (hasNonNull) { this.min = stats.genericGetMin(); @@ -199,10 +202,10 @@ public class CheckParquet251Command extends BaseCommand { public void validate(T value) { if (hasNonNull) { - if (min.compareTo(value) > 0) { + if (comparator.compare(min, value) > 0) { throw new BadStatsException("Min should be <= all values."); } - if (max.compareTo(value) < 0) { + if (comparator.compare(max, value) < 0) { throw new BadStatsException("Max should be >= all values."); } } @@ -343,8 +346,8 @@ public class CheckParquet251Command extends BaseCommand { console.debug(String.format( "Validated stats min=%s max=%s nulls=%d for page=%s col=%s", - String.valueOf(stats.genericGetMin()), - String.valueOf(stats.genericGetMax()), stats.getNumNulls(), page, + stats.minAsString(), + stats.maxAsString(), stats.getNumNulls(), page, Arrays.toString(desc.getPath()))); } } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/c6764c4a/parquet-column/src/main/java/org/apache/parquet/column/ColumnDescriptor.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/main/java/org/apache/parquet/column/ColumnDescriptor.java b/parquet-column/src/main/java/org/apache/parquet/column/ColumnDescriptor.java index 61f13a2..5f30cd0 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/ColumnDescriptor.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/ColumnDescriptor.java @@ -20,7 +20,9 @@ package org.apache.parquet.column; import java.util.Arrays; +import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; +import org.apache.parquet.schema.Type; /** * Describes a column's type as well as its position in its containing schema. @@ -31,8 +33,7 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; public class ColumnDescriptor implements Comparable<ColumnDescriptor> { private final String[] path; - private final PrimitiveTypeName type; - private final int typeLength; + private final PrimitiveType type; private final int maxRep; private final int maxDef; @@ -42,8 +43,10 @@ public class ColumnDescriptor implements Comparable<ColumnDescriptor> { * @param type the type of the field * @param maxRep the maximum repetition level for that path * @param maxDef the maximum definition level for that path + * @deprecated Use {@link #ColumnDescriptor(String[], PrimitiveTypeName, int, int)} */ - public ColumnDescriptor(String[] path, PrimitiveTypeName type, int maxRep, + @Deprecated + public ColumnDescriptor(String[] path, PrimitiveTypeName type, int maxRep, int maxDef) { this(path, type, 0, maxRep, maxDef); } @@ -54,13 +57,23 @@ public class ColumnDescriptor implements Comparable<ColumnDescriptor> { * @param type the type of the field * @param maxRep the maximum repetition level for that path * @param maxDef the maximum definition level for that path + * @deprecated Use {@link #ColumnDescriptor(String[], PrimitiveTypeName, int, int)} */ - public ColumnDescriptor(String[] path, PrimitiveTypeName type, + @Deprecated + public ColumnDescriptor(String[] path, PrimitiveTypeName type, int typeLength, int maxRep, int maxDef) { - super(); + this(path, new PrimitiveType(Type.Repetition.OPTIONAL, type, typeLength,""), maxRep, maxDef); + } + + /** + * @param path the path to the leaf field in the schema + * @param type the type of the field + * @param maxRep the maximum repetition level for that path + * @param maxDef the maximum definition level for that path + */ + public ColumnDescriptor(String[] path, PrimitiveType type, int maxRep, int maxDef) { this.path = path; this.type = type; - this.typeLength = typeLength; this.maxRep = maxRep; this.maxDef = maxDef; } @@ -88,16 +101,27 @@ public class ColumnDescriptor implements Comparable<ColumnDescriptor> { /** * @return the type of that column + * @deprecated will removed in 2.0.0. Use {@link #getPrimitiveType()} instead. */ + @Deprecated public PrimitiveTypeName getType() { - return type; + return type.getPrimitiveTypeName(); } /** * @return the size of the type + * @deprecated will removed in 2.0.0. Use {@link #getPrimitiveType()} instead. **/ + @Deprecated public int getTypeLength() { - return typeLength; + return type.getTypeLength(); + } + + /** + * @return the primitive type object of the column + */ + public PrimitiveType getPrimitiveType() { + return type; } @Override http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/c6764c4a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV1.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV1.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV1.java index c5b3884..e274c11 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV1.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV1.java @@ -80,7 +80,7 @@ final class ColumnWriterV1 implements ColumnWriter { } private void resetStatistics() { - this.statistics = Statistics.getStatsBasedOnType(this.path.getType()); + this.statistics = Statistics.createStats(this.path.getPrimitiveType()); } /** http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/c6764c4a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV2.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV2.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV2.java index c6fd91b..b50d663 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV2.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV2.java @@ -77,7 +77,7 @@ final class ColumnWriterV2 implements ColumnWriter { } private void resetStatistics() { - this.statistics = Statistics.getStatsBasedOnType(this.path.getType()); + this.statistics = Statistics.createStats(path.getPrimitiveType()); } private void definitionLevel(int definitionLevel) { http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/c6764c4a/parquet-column/src/main/java/org/apache/parquet/column/statistics/BinaryStatistics.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/main/java/org/apache/parquet/column/statistics/BinaryStatistics.java b/parquet-column/src/main/java/org/apache/parquet/column/statistics/BinaryStatistics.java index c319b4a..a68285b 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/statistics/BinaryStatistics.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/statistics/BinaryStatistics.java @@ -19,12 +19,38 @@ package org.apache.parquet.column.statistics; import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Types; public class BinaryStatistics extends Statistics<Binary> { + // A fake type object to be used to generate the proper comparator + private static final PrimitiveType DEFAULT_FAKE_TYPE = Types.optional(PrimitiveType.PrimitiveTypeName.BINARY) + .named("fake_binary_type"); + private Binary max; private Binary min; + /** + * @deprecated will be removed in 2.0.0. Use {@link Statistics#createStats(org.apache.parquet.schema.Type)} instead + */ + @Deprecated + public BinaryStatistics() { + this(DEFAULT_FAKE_TYPE); + } + + BinaryStatistics(PrimitiveType type) { + super(type); + } + + private BinaryStatistics(BinaryStatistics other) { + super(other.type()); + if (other.hasNonNullValue()) { + initializeStats(other.min, other.max); + } + setNumNulls(other.getNumNulls()); + } + @Override public void updateStats(Binary value) { if (!this.hasNonNullValue()) { @@ -68,18 +94,14 @@ public class BinaryStatistics extends Statistics<Binary> { } @Override - public boolean isSmallerThan(long size) { - return !hasNonNullValue() || ((min.length() + max.length()) < size); + String toString(Binary value) { + // TODO: have separate toString for different logical types? + return value == null ? "null" : value.toStringUsingUTF8(); } @Override - public String toString() { - if (this.hasNonNullValue()) - return String.format("min: %s, max: %s, num_nulls: %d", min.toStringUsingUTF8(), max.toStringUsingUTF8(), this.getNumNulls()); - else if (!this.isEmpty()) - return String.format("num_nulls: %d, min/max not defined", this.getNumNulls()); - else - return "no stats for this column"; + public boolean isSmallerThan(long size) { + return !hasNonNullValue() || ((min.length() + max.length()) < size); } /** @@ -87,8 +109,8 @@ public class BinaryStatistics extends Statistics<Binary> { */ @Deprecated public void updateStats(Binary min_value, Binary max_value) { - if (min.compareTo(min_value) > 0) { min = min_value.copy(); } - if (max.compareTo(max_value) < 0) { max = max_value.copy(); } + if (comparator().compare(min, min_value) > 0) { min = min_value.copy(); } + if (comparator().compare(max, max_value) < 0) { max = max_value.copy(); } } /** @@ -136,4 +158,9 @@ public class BinaryStatistics extends Statistics<Binary> { this.min = min; this.markAsNotEmpty(); } + + @Override + public BinaryStatistics copy() { + return new BinaryStatistics(this); + } } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/c6764c4a/parquet-column/src/main/java/org/apache/parquet/column/statistics/BooleanStatistics.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/main/java/org/apache/parquet/column/statistics/BooleanStatistics.java b/parquet-column/src/main/java/org/apache/parquet/column/statistics/BooleanStatistics.java index 22c2393..0e77b61 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/statistics/BooleanStatistics.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/statistics/BooleanStatistics.java @@ -19,12 +19,38 @@ package org.apache.parquet.column.statistics; import org.apache.parquet.bytes.BytesUtils; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Types; public class BooleanStatistics extends Statistics<Boolean> { + // A fake type object to be used to generate the proper comparator + private static final PrimitiveType DEFAULT_FAKE_TYPE = Types.optional(PrimitiveType.PrimitiveTypeName.BOOLEAN) + .named("fake_boolean_type"); + private boolean max; private boolean min; + /** + * @deprecated will be removed in 2.0.0. Use {@link Statistics#createStats(org.apache.parquet.schema.Type)} instead + */ + @Deprecated + public BooleanStatistics() { + this(DEFAULT_FAKE_TYPE); + } + + BooleanStatistics(PrimitiveType type) { + super(type); + } + + private BooleanStatistics(BooleanStatistics other) { + super(other.type()); + if (other.hasNonNullValue()) { + initializeStats(other.min, other.max); + } + setNumNulls(other.getNumNulls()); + } + @Override public void updateStats(boolean value) { if (!this.hasNonNullValue()) { @@ -66,19 +92,9 @@ public class BooleanStatistics extends Statistics<Boolean> { return !hasNonNullValue() || (2 < size); } - @Override - public String toString() { - if (this.hasNonNullValue()) - return String.format("min: %b, max: %b, num_nulls: %d", min, max, this.getNumNulls()); - else if(!this.isEmpty()) - return String.format("num_nulls: %d, min/max not defined", this.getNumNulls()); - else - return "no stats for this column"; - } - public void updateStats(boolean min_value, boolean max_value) { - if (min && !min_value) { min = min_value; } - if (!max && max_value) { max = max_value; } + if (comparator().compare(min, min_value) > 0) { min = min_value; } + if (comparator().compare(max, max_value) < 0) { max = max_value; } } public void initializeStats(boolean min_value, boolean max_value) { @@ -97,6 +113,14 @@ public class BooleanStatistics extends Statistics<Boolean> { return max; } + public int compareMinToValue(boolean value) { + return comparator().compare(min, value); + } + + public int compareMaxToValue(boolean value) { + return comparator().compare(max, value); + } + public boolean getMax() { return max; } @@ -110,4 +134,9 @@ public class BooleanStatistics extends Statistics<Boolean> { this.min = min; this.markAsNotEmpty(); } + + @Override + public BooleanStatistics copy() { + return new BooleanStatistics(this); + } } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/c6764c4a/parquet-column/src/main/java/org/apache/parquet/column/statistics/DoubleStatistics.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/main/java/org/apache/parquet/column/statistics/DoubleStatistics.java b/parquet-column/src/main/java/org/apache/parquet/column/statistics/DoubleStatistics.java index d67a550..0dd067b 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/statistics/DoubleStatistics.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/statistics/DoubleStatistics.java @@ -19,12 +19,38 @@ package org.apache.parquet.column.statistics; import org.apache.parquet.bytes.BytesUtils; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Types; public class DoubleStatistics extends Statistics<Double> { + // A fake type object to be used to generate the proper comparator + private static final PrimitiveType DEFAULT_FAKE_TYPE = Types.optional(PrimitiveType.PrimitiveTypeName.DOUBLE) + .named("fake_double_type"); + private double max; private double min; + /** + * @deprecated will be removed in 2.0.0. Use {@link Statistics#createStats(org.apache.parquet.schema.Type)} instead + */ + @Deprecated + public DoubleStatistics() { + this(DEFAULT_FAKE_TYPE); + } + + DoubleStatistics(PrimitiveType type) { + super(type); + } + + private DoubleStatistics(DoubleStatistics other) { + super(other.type()); + if (other.hasNonNullValue()) { + initializeStats(other.min, other.max); + } + setNumNulls(other.getNumNulls()); + } + @Override public void updateStats(double value) { if (!this.hasNonNullValue()) { @@ -62,23 +88,18 @@ public class DoubleStatistics extends Statistics<Double> { } @Override - public boolean isSmallerThan(long size) { - return !hasNonNullValue() || (16 < size); + String toString(Double value) { + return String.format("%.5f", value); } @Override - public String toString() { - if(this.hasNonNullValue()) - return String.format("min: %.5f, max: %.5f, num_nulls: %d", min, max, this.getNumNulls()); - else if (!this.isEmpty()) - return String.format("num_nulls: %d, min/max not defined", this.getNumNulls()); - else - return "no stats for this column"; + public boolean isSmallerThan(long size) { + return !hasNonNullValue() || (16 < size); } public void updateStats(double min_value, double max_value) { - if (min_value < min) { min = min_value; } - if (max_value > max) { max = max_value; } + if (comparator().compare(min, min_value) > 0) { min = min_value; } + if (comparator().compare(max, max_value) < 0) { max = max_value; } } public void initializeStats(double min_value, double max_value) { @@ -97,6 +118,14 @@ public class DoubleStatistics extends Statistics<Double> { return max; } + public int compareMinToValue(double value) { + return comparator().compare(min, value); + } + + public int compareMaxToValue(double value) { + return comparator().compare(max, value); + } + public double getMax() { return max; } @@ -110,4 +139,9 @@ public class DoubleStatistics extends Statistics<Double> { this.min = min; this.markAsNotEmpty(); } + + @Override + public DoubleStatistics copy() { + return new DoubleStatistics(this); + } } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/c6764c4a/parquet-column/src/main/java/org/apache/parquet/column/statistics/FloatStatistics.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/main/java/org/apache/parquet/column/statistics/FloatStatistics.java b/parquet-column/src/main/java/org/apache/parquet/column/statistics/FloatStatistics.java index dffc207..36836c6 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/statistics/FloatStatistics.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/statistics/FloatStatistics.java @@ -19,12 +19,39 @@ package org.apache.parquet.column.statistics; import org.apache.parquet.bytes.BytesUtils; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Types; public class FloatStatistics extends Statistics<Float> { + // A fake type object to be used to generate the proper comparator + private static final PrimitiveType DEFAULT_FAKE_TYPE = Types.optional(PrimitiveType.PrimitiveTypeName.FLOAT) + .named("fake_float_type"); + private float max; private float min; + /** + * @deprecated will be removed in 2.0.0. Use {@link Statistics#createStats(org.apache.parquet.schema.Type)} instead + */ + @Deprecated + public FloatStatistics() { + // Creating a fake primitive type to have the proper comparator + this(DEFAULT_FAKE_TYPE); + } + + FloatStatistics(PrimitiveType type) { + super(type); + } + + private FloatStatistics(FloatStatistics other) { + super(other.type()); + if (other.hasNonNullValue()) { + initializeStats(other.min, other.max); + } + setNumNulls(other.getNumNulls()); + } + @Override public void updateStats(float value) { if (!this.hasNonNullValue()) { @@ -62,23 +89,18 @@ public class FloatStatistics extends Statistics<Float> { } @Override - public boolean isSmallerThan(long size) { - return !hasNonNullValue() || (8 < size); + String toString(Float value) { + return String.format("%.5f", value); } @Override - public String toString() { - if (this.hasNonNullValue()) - return String.format("min: %.5f, max: %.5f, num_nulls: %d", min, max, this.getNumNulls()); - else if (!this.isEmpty()) - return String.format("num_nulls: %d, min/max not defined", this.getNumNulls()); - else - return "no stats for this column"; + public boolean isSmallerThan(long size) { + return !hasNonNullValue() || (8 < size); } public void updateStats(float min_value, float max_value) { - if (min_value < min) { min = min_value; } - if (max_value > max) { max = max_value; } + if (comparator().compare(min, min_value) > 0) { min = min_value; } + if (comparator().compare(max, max_value) < 0) { max = max_value; } } public void initializeStats(float min_value, float max_value) { @@ -97,6 +119,14 @@ public class FloatStatistics extends Statistics<Float> { return max; } + public int compareMinToValue(float value) { + return comparator().compare(min, value); + } + + public int compareMaxToValue(float value) { + return comparator().compare(max, value); + } + public float getMax() { return max; } @@ -110,4 +140,9 @@ public class FloatStatistics extends Statistics<Float> { this.min = min; this.markAsNotEmpty(); } + + @Override + public FloatStatistics copy() { + return new FloatStatistics(this); + } } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/c6764c4a/parquet-column/src/main/java/org/apache/parquet/column/statistics/IntStatistics.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/main/java/org/apache/parquet/column/statistics/IntStatistics.java b/parquet-column/src/main/java/org/apache/parquet/column/statistics/IntStatistics.java index a5d7ba1..5df7f0a 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/statistics/IntStatistics.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/statistics/IntStatistics.java @@ -19,12 +19,38 @@ package org.apache.parquet.column.statistics; import org.apache.parquet.bytes.BytesUtils; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Types; public class IntStatistics extends Statistics<Integer> { + // A fake type object to be used to generate the proper comparator + private static final PrimitiveType DEFAULT_FAKE_TYPE = Types.optional(PrimitiveType.PrimitiveTypeName.INT32) + .named("fake_int32_type"); + private int max; private int min; + /** + * @deprecated will be removed in 2.0.0. Use {@link Statistics#createStats(org.apache.parquet.schema.Type)} instead + */ + @Deprecated + public IntStatistics() { + this(DEFAULT_FAKE_TYPE); + } + + IntStatistics(PrimitiveType type) { + super(type); + } + + private IntStatistics(IntStatistics other) { + super(other.type()); + if (other.hasNonNullValue()) { + initializeStats(other.min, other.max); + } + setNumNulls(other.getNumNulls()); + } + @Override public void updateStats(int value) { if (!this.hasNonNullValue()) { @@ -62,23 +88,19 @@ public class IntStatistics extends Statistics<Integer> { } @Override - public boolean isSmallerThan(long size) { - return !hasNonNullValue() || (8 < size); + String toString(Integer value) { + // TODO: implement unsigned int as required + return value.toString(); } @Override - public String toString() { - if (this.hasNonNullValue()) - return String.format("min: %d, max: %d, num_nulls: %d", min, max, this.getNumNulls()); - else if (!this.isEmpty()) - return String.format("num_nulls: %d, min/max is not defined", this.getNumNulls()); - else - return "no stats for this column"; + public boolean isSmallerThan(long size) { + return !hasNonNullValue() || (8 < size); } public void updateStats(int min_value, int max_value) { - if (min_value < min) { min = min_value; } - if (max_value > max) { max = max_value; } + if (comparator().compare(min, min_value) > 0) { min = min_value; } + if (comparator().compare(max, max_value) < 0) { max = max_value; } } public void initializeStats(int min_value, int max_value) { @@ -97,6 +119,14 @@ public class IntStatistics extends Statistics<Integer> { return max; } + public int compareMinToValue(int value) { + return comparator().compare(min, value); + } + + public int compareMaxToValue(int value) { + return comparator().compare(max, value); + } + public int getMax() { return max; } @@ -110,4 +140,9 @@ public class IntStatistics extends Statistics<Integer> { this.min = min; this.markAsNotEmpty(); } + + @Override + public IntStatistics copy() { + return new IntStatistics(this); + } } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/c6764c4a/parquet-column/src/main/java/org/apache/parquet/column/statistics/LongStatistics.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/main/java/org/apache/parquet/column/statistics/LongStatistics.java b/parquet-column/src/main/java/org/apache/parquet/column/statistics/LongStatistics.java index f7971ef..fd6d19c 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/statistics/LongStatistics.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/statistics/LongStatistics.java @@ -19,12 +19,38 @@ package org.apache.parquet.column.statistics; import org.apache.parquet.bytes.BytesUtils; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Types; public class LongStatistics extends Statistics<Long> { + // A fake type object to be used to generate the proper comparator + private static final PrimitiveType DEFAULT_FAKE_TYPE = Types.optional(PrimitiveType.PrimitiveTypeName.INT64) + .named("fake_int64_type"); + private long max; private long min; + /** + * @deprecated will be removed in 2.0.0. Use {@link Statistics#createStats(org.apache.parquet.schema.Type)} instead + */ + @Deprecated + public LongStatistics() { + this(DEFAULT_FAKE_TYPE); + } + + LongStatistics(PrimitiveType type) { + super(type); + } + + private LongStatistics(LongStatistics other) { + super(other.type()); + if (other.hasNonNullValue()) { + initializeStats(other.min, other.max); + } + setNumNulls(other.getNumNulls()); + } + @Override public void updateStats(long value) { if (!this.hasNonNullValue()) { @@ -62,23 +88,19 @@ public class LongStatistics extends Statistics<Long> { } @Override - public boolean isSmallerThan(long size) { - return !hasNonNullValue() || (16 < size); + String toString(Long value) { + // TODO: implement unsigned int as required + return value.toString(); } @Override - public String toString() { - if (this.hasNonNullValue()) - return String.format("min: %d, max: %d, num_nulls: %d", min, max, this.getNumNulls()); - else if (!this.isEmpty()) - return String.format("num_nulls: %d, min/max not defined", this.getNumNulls()); - else - return "no stats for this column"; + public boolean isSmallerThan(long size) { + return !hasNonNullValue() || (16 < size); } public void updateStats(long min_value, long max_value) { - if (min_value < min) { min = min_value; } - if (max_value > max) { max = max_value; } + if (comparator().compare(min, min_value) > 0) { min = min_value; } + if (comparator().compare(max, max_value) < 0) { max = max_value; } } public void initializeStats(long min_value, long max_value) { @@ -97,6 +119,14 @@ public class LongStatistics extends Statistics<Long> { return max; } + public int compareMinToValue(long value) { + return comparator().compare(min, value); + } + + public int compareMaxToValue(long value) { + return comparator().compare(max, value); + } + public long getMax() { return max; } @@ -110,4 +140,9 @@ public class LongStatistics extends Statistics<Long> { this.min = min; this.markAsNotEmpty(); } + + @Override + public LongStatistics copy() { + return new LongStatistics(this); + } } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/c6764c4a/parquet-column/src/main/java/org/apache/parquet/column/statistics/Statistics.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/main/java/org/apache/parquet/column/statistics/Statistics.java b/parquet-column/src/main/java/org/apache/parquet/column/statistics/Statistics.java index 30153c0..6eb2381 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/statistics/Statistics.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/statistics/Statistics.java @@ -18,10 +18,15 @@ */ package org.apache.parquet.column.statistics; +import java.util.Arrays; +import java.util.Objects; + import org.apache.parquet.column.UnknownColumnTypeException; import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.PrimitiveComparator; +import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; -import java.util.Arrays; +import org.apache.parquet.schema.Type; /** @@ -31,10 +36,14 @@ import java.util.Arrays; */ public abstract class Statistics<T extends Comparable<T>> { + private final PrimitiveType type; + private final PrimitiveComparator<T> comparator; private boolean hasNonNullValue; private long num_nulls; - public Statistics() { + Statistics(PrimitiveType type) { + this.type = type; + this.comparator = type.comparator(); hasNonNullValue = false; num_nulls = 0; } @@ -43,27 +52,59 @@ public abstract class Statistics<T extends Comparable<T>> { * Returns the typed statistics object based on the passed type parameter * @param type PrimitiveTypeName type of the column * @return instance of a typed statistics class + * @deprecated Use {@link #createStats(Type)} instead */ + @Deprecated public static Statistics getStatsBasedOnType(PrimitiveTypeName type) { - switch(type) { - case INT32: - return new IntStatistics(); - case INT64: - return new LongStatistics(); - case FLOAT: - return new FloatStatistics(); - case DOUBLE: - return new DoubleStatistics(); - case BOOLEAN: - return new BooleanStatistics(); - case BINARY: - return new BinaryStatistics(); - case INT96: - return new BinaryStatistics(); - case FIXED_LEN_BYTE_ARRAY: - return new BinaryStatistics(); - default: - throw new UnknownColumnTypeException(type); + switch (type) { + case INT32: + return new IntStatistics(); + case INT64: + return new LongStatistics(); + case FLOAT: + return new FloatStatistics(); + case DOUBLE: + return new DoubleStatistics(); + case BOOLEAN: + return new BooleanStatistics(); + case BINARY: + return new BinaryStatistics(); + case INT96: + return new BinaryStatistics(); + case FIXED_LEN_BYTE_ARRAY: + return new BinaryStatistics(); + default: + throw new UnknownColumnTypeException(type); + } + } + + /** + * Creates an empty {@code Statistics} instance for the specified type to be + * used for reading/writing the new min/max statistics used in the V2 format. + * + * @param type + * type of the column + * @return instance of a typed statistics class + */ + public static Statistics<?> createStats(Type type) { + PrimitiveType primitive = type.asPrimitiveType(); + switch (primitive.getPrimitiveTypeName()) { + case INT32: + return new IntStatistics(primitive); + case INT64: + return new LongStatistics(primitive); + case FLOAT: + return new FloatStatistics(primitive); + case DOUBLE: + return new DoubleStatistics(primitive); + case BOOLEAN: + return new BooleanStatistics(primitive); + case BINARY: + case INT96: + case FIXED_LEN_BYTE_ARRAY: + return new BinaryStatistics(primitive); + default: + throw new UnknownColumnTypeException(primitive.getPrimitiveTypeName()); } } @@ -127,9 +168,10 @@ public abstract class Statistics<T extends Comparable<T>> { if (!(other instanceof Statistics)) return false; Statistics stats = (Statistics) other; - return Arrays.equals(stats.getMaxBytes(), this.getMaxBytes()) && - Arrays.equals(stats.getMinBytes(), this.getMinBytes()) && - stats.getNumNulls() == this.getNumNulls(); + return type.equals(stats.type) && + Arrays.equals(stats.getMaxBytes(), this.getMaxBytes()) && + Arrays.equals(stats.getMinBytes(), this.getMinBytes()) && + stats.getNumNulls() == this.getNumNulls(); } /** @@ -138,7 +180,8 @@ public abstract class Statistics<T extends Comparable<T>> { */ @Override public int hashCode() { - return 31 * Arrays.hashCode(getMaxBytes()) + 17 * Arrays.hashCode(getMinBytes()) + Long.valueOf(this.getNumNulls()).hashCode(); + return 31 * type.hashCode() + 31 * Arrays.hashCode(getMaxBytes()) + 17 * Arrays.hashCode(getMinBytes()) + + Long.valueOf(this.getNumNulls()).hashCode(); } /** @@ -150,14 +193,15 @@ public abstract class Statistics<T extends Comparable<T>> { public void mergeStatistics(Statistics stats) { if (stats.isEmpty()) return; - if (this.getClass() == stats.getClass()) { + // Merge stats only if they have the same type + if (type.equals(stats.type)) { incrementNumNulls(stats.getNumNulls()); if (stats.hasNonNullValue()) { mergeStatisticsMinMax(stats); markAsNotEmpty(); } } else { - throw new StatisticsClassException(this.getClass().toString(), stats.getClass().toString()); + throw StatisticsClassException.create(this, stats); } } @@ -175,10 +219,59 @@ public abstract class Statistics<T extends Comparable<T>> { */ abstract public void setMinMaxFromBytes(byte[] minBytes, byte[] maxBytes); + /** + * Returns the min value in the statistics. The java natural order of the returned type defined by {@link + * T#compareTo(Object)} might not be the proper one. For example, UINT_32 requires unsigned comparison instead of the + * natural signed one. Use {@link #compareMinToValue(Comparable)} or the comparator returned by {@link #comparator()} to + * always get the proper ordering. + */ abstract public T genericGetMin(); + + /** + * Returns the max value in the statistics. The java natural order of the returned type defined by {@link + * T#compareTo(Object)} might not be the proper one. For example, UINT_32 requires unsigned comparison instead of the + * natural signed one. Use {@link #compareMaxToValue(Comparable)} or the comparator returned by {@link #comparator()} to + * always get the proper ordering. + */ abstract public T genericGetMax(); /** + * Returns the {@link PrimitiveComparator} implementation to be used to compare two generic values in the proper way + * (for example, unsigned comparison for UINT_32). + */ + public final PrimitiveComparator<T> comparator() { + return comparator; + } + + /** + * Compares min to the specified value in the proper way. It does the same as invoking + * {@code comparator().compare(genericGetMin(), value)}. The corresponding statistics implementations overload this + * method so the one with the primitive argument shall be used to avoid boxing/unboxing. + * + * @param value + * the value which {@code min} is to be compared to + * @return a negative integer, zero, or a positive integer as {@code min} is less than, equal to, or greater than + * {@code value}. + */ + public final int compareMinToValue(T value) { + return comparator.compare(genericGetMin(), value); + } + + /** + * Compares max to the specified value in the proper way. It does the same as invoking + * {@code comparator().compare(genericGetMax(), value)}. The corresponding statistics implementations overload this + * method so the one with the primitive argument shall be used to avoid boxing/unboxing. + * + * @param value + * the value which {@code max} is to be compared to + * @return a negative integer, zero, or a positive integer as {@code max} is less than, equal to, or greater than + * {@code value}. + */ + public final int compareMaxToValue(T value) { + return comparator.compare(genericGetMax(), value); + } + + /** * Abstract method to return the max value as a byte array * @return byte array corresponding to the max value */ @@ -191,6 +284,24 @@ public abstract class Statistics<T extends Comparable<T>> { abstract public byte[] getMinBytes(); /** + * Returns the string representation of min for debugging/logging purposes. + */ + public String minAsString() { + return toString(genericGetMin()); + } + + /** + * Returns the string representation of max for debugging/logging purposes. + */ + public String maxAsString() { + return toString(genericGetMax()); + } + + String toString(T value) { + return Objects.toString(value); + } + + /** * Abstract method to return whether the min and max values fit in the given * size. * @param size a size in bytes @@ -198,11 +309,15 @@ public abstract class Statistics<T extends Comparable<T>> { */ abstract public boolean isSmallerThan(long size); - /** - * toString() to display min, max, num_nulls in a string - */ - abstract public String toString(); - + @Override + public String toString() { + if (this.hasNonNullValue()) + return String.format("min: %s, max: %s, num_nulls: %d", minAsString(), maxAsString(), this.getNumNulls()); + else if (!this.isEmpty()) + return String.format("num_nulls: %d, min/max not defined", this.getNumNulls()); + else + return "no stats for this column"; + } /** * Increments the null count by one @@ -250,13 +365,25 @@ public abstract class Statistics<T extends Comparable<T>> { public boolean hasNonNullValue() { return hasNonNullValue; } - + /** * Sets the page/column as having a valid non-null value * kind of misnomer here - */ + */ protected void markAsNotEmpty() { hasNonNullValue = true; } + + /** + * @return a new independent statistics instance of this class. + */ + public abstract Statistics<T> copy(); + + /** + * @return the primitive type object which this statistics is created for + */ + public PrimitiveType type() { + return type; + } } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/c6764c4a/parquet-column/src/main/java/org/apache/parquet/column/statistics/StatisticsClassException.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/main/java/org/apache/parquet/column/statistics/StatisticsClassException.java b/parquet-column/src/main/java/org/apache/parquet/column/statistics/StatisticsClassException.java index a242737..4c23101 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/statistics/StatisticsClassException.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/statistics/StatisticsClassException.java @@ -29,6 +29,18 @@ public class StatisticsClassException extends ParquetRuntimeException { private static final long serialVersionUID = 1L; public StatisticsClassException(String className1, String className2) { - super("Statistics classes mismatched: " + className1 + " vs. " + className2); + this("Statistics classes mismatched: " + className1 + " vs. " + className2); + } + + private StatisticsClassException(String msg) { + super(msg); + } + + static StatisticsClassException create(Statistics<?> stats1, Statistics<?> stats2) { + if (stats1.getClass() != stats2.getClass()) { + return new StatisticsClassException(stats1.getClass().toString(), stats2.getClass().toString()); + } + return new StatisticsClassException( + "Statistics comparator mismatched: " + stats1.comparator() + " vs. " + stats2.comparator()); } } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/c6764c4a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/Statistics.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/Statistics.java b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/Statistics.java index 22e4027..8df0250 100644 --- a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/Statistics.java +++ b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/Statistics.java @@ -18,6 +18,8 @@ */ package org.apache.parquet.filter2.predicate; +import java.util.Comparator; + import static org.apache.parquet.Preconditions.checkNotNull; /** @@ -26,17 +28,51 @@ import static org.apache.parquet.Preconditions.checkNotNull; public class Statistics<T> { private final T min; private final T max; + private final Comparator<T> comparator; + // Intended for use only within Parquet itself. + /** + * @deprecated will be removed in 2.0.0. Use {@link #Statistics(Object, Object, Comparator)} instead + */ + @Deprecated public Statistics(T min, T max) { this.min = checkNotNull(min, "min"); this.max = checkNotNull(max, "max"); + this.comparator = null; } + // Intended for use only within Parquet itself. + public Statistics(T min, T max, Comparator<T> comparator) { + this.min = checkNotNull(min, "min"); + this.max = checkNotNull(max, "max"); + this.comparator = checkNotNull(comparator, "comparator"); + } + + /** + * Returns the generic object representing the min value in the statistics. The + * natural ordering of type {@code T} defined by the {@code compareTo} method + * might not be appropriate for the actual logical type. Use + * {@link #getComparator()} for comparing. + */ public T getMin() { return min; } + /** + * Returns the generic object representing the max value in the statistics. The + * natural ordering of type {@code T} defined by the {@code compareTo} method + * might not be appropriate for the actual logical type. Use + * {@link #getComparator()} for comparing. + */ public T getMax() { return max; } + + /** + * Returns the comparator to be used to compare two generic values in the proper way (e.g. unsigned comparison for + * UINT_32) + */ + public Comparator<T> getComparator() { + return comparator; + } } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/c6764c4a/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateBuilderBase.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateBuilderBase.java b/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateBuilderBase.java index 8def88e..c1f759c 100644 --- a/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateBuilderBase.java +++ b/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateBuilderBase.java @@ -23,6 +23,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.hadoop.metadata.ColumnPath; import org.apache.parquet.filter2.predicate.FilterPredicate; import org.apache.parquet.filter2.predicate.FilterPredicate.Visitor; @@ -30,6 +31,8 @@ import org.apache.parquet.filter2.predicate.Operators.And; import org.apache.parquet.filter2.predicate.Operators.Not; import org.apache.parquet.filter2.predicate.Operators.Or; import org.apache.parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.ValueInspector; +import org.apache.parquet.io.PrimitiveColumnIO; +import org.apache.parquet.schema.PrimitiveComparator; import static org.apache.parquet.Preconditions.checkArgument; @@ -55,9 +58,20 @@ import static org.apache.parquet.Preconditions.checkArgument; public abstract class IncrementallyUpdatedFilterPredicateBuilderBase implements Visitor<IncrementallyUpdatedFilterPredicate> { private boolean built = false; private final Map<ColumnPath, List<ValueInspector>> valueInspectorsByColumn = new HashMap<ColumnPath, List<ValueInspector>>(); + private final Map<ColumnPath, PrimitiveComparator<?>> comparatorsByColumn = new HashMap<>(); + @Deprecated public IncrementallyUpdatedFilterPredicateBuilderBase() { } + public IncrementallyUpdatedFilterPredicateBuilderBase(List<PrimitiveColumnIO> leaves) { + for (PrimitiveColumnIO leaf : leaves) { + ColumnDescriptor descriptor = leaf.getColumnDescriptor(); + ColumnPath path = ColumnPath.get(descriptor.getPath()); + PrimitiveComparator<?> comparator = descriptor.getPrimitiveType().comparator(); + comparatorsByColumn.put(path, comparator); + } + } + public final IncrementallyUpdatedFilterPredicate build(FilterPredicate pred) { checkArgument(!built, "This builder has already been used"); IncrementallyUpdatedFilterPredicate incremental = pred.accept(this); @@ -78,6 +92,11 @@ public abstract class IncrementallyUpdatedFilterPredicateBuilderBase implements return valueInspectorsByColumn; } + @SuppressWarnings("unchecked") + protected final <T> PrimitiveComparator<T> getComparator(ColumnPath path) { + return (PrimitiveComparator<T>) comparatorsByColumn.get(path); + } + @Override public final IncrementallyUpdatedFilterPredicate visit(And and) { return new IncrementallyUpdatedFilterPredicate.And(and.getLeft().accept(this), and.getRight().accept(this)); http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/c6764c4a/parquet-column/src/main/java/org/apache/parquet/io/MessageColumnIO.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/main/java/org/apache/parquet/io/MessageColumnIO.java b/parquet-column/src/main/java/org/apache/parquet/io/MessageColumnIO.java index 67efdb3..7346c5a 100644 --- a/parquet-column/src/main/java/org/apache/parquet/io/MessageColumnIO.java +++ b/parquet-column/src/main/java/org/apache/parquet/io/MessageColumnIO.java @@ -109,7 +109,7 @@ public class MessageColumnIO extends GroupColumnIO { public RecordReader<T> visit(FilterPredicateCompat filterPredicateCompat) { FilterPredicate predicate = filterPredicateCompat.getFilterPredicate(); - IncrementallyUpdatedFilterPredicateBuilder builder = new IncrementallyUpdatedFilterPredicateBuilder(); + IncrementallyUpdatedFilterPredicateBuilder builder = new IncrementallyUpdatedFilterPredicateBuilder(leaves); IncrementallyUpdatedFilterPredicate streamingPredicate = builder.build(predicate); RecordMaterializer<T> filteringRecordMaterializer = new FilteringRecordMaterializer<T>( recordMaterializer, http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/c6764c4a/parquet-column/src/main/java/org/apache/parquet/io/PrimitiveColumnIO.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/main/java/org/apache/parquet/io/PrimitiveColumnIO.java b/parquet-column/src/main/java/org/apache/parquet/io/PrimitiveColumnIO.java index 15c28c8..e40b24f 100644 --- a/parquet-column/src/main/java/org/apache/parquet/io/PrimitiveColumnIO.java +++ b/parquet-column/src/main/java/org/apache/parquet/io/PrimitiveColumnIO.java @@ -52,10 +52,9 @@ public class PrimitiveColumnIO extends ColumnIO { super.setLevels(r, d, fieldPath, fieldIndexPath, repetition, path); PrimitiveType type = getType().asPrimitiveType(); this.columnDescriptor = new ColumnDescriptor( - fieldPath, - type.getPrimitiveTypeName(), - type.getTypeLength(), - getRepetitionLevel(), + fieldPath, + type, + getRepetitionLevel(), getDefinitionLevel()); this.path = path.toArray(new ColumnIO[path.size()]); } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/c6764c4a/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java b/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java index 50b98c2..9f5f0f2 100644 --- a/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java +++ b/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java @@ -33,6 +33,7 @@ import java.util.Arrays; import org.apache.parquet.io.ParquetDecodingException; import org.apache.parquet.io.ParquetEncodingException; +import org.apache.parquet.schema.PrimitiveComparator; import static org.apache.parquet.bytes.BytesUtils.UTF8; @@ -71,12 +72,14 @@ abstract public class Binary implements Comparable<Binary>, Serializable { abstract boolean equals(Binary other); + /** + * @deprecated will be removed in 2.0.0. The comparison logic depends on the related logical type therefore this one + * might not be correct. The {@link java.util.Comparator} implementation for the related type available at + * {@link Type#comparator()} shall be used instead. + */ + @Deprecated abstract public int compareTo(Binary other); - abstract int compareTo(byte[] bytes, int offset, int length); - - abstract int compareTo(ByteBuffer bytes, int offset, int length); - abstract public ByteBuffer toByteBuffer(); @Override @@ -189,17 +192,7 @@ abstract public class Binary implements Comparable<Binary>, Serializable { @Override public int compareTo(Binary other) { - return other.compareTo(value, offset, length); - } - - @Override - int compareTo(byte[] other, int otherOffset, int otherLength) { - return Binary.compareTwoByteArrays(value, offset, length, other, otherOffset, otherLength); - } - - @Override - int compareTo(ByteBuffer bytes, int otherOffset, int otherLength) { - return Binary.compareByteArrayToByteBuffer(value, offset, length, bytes, otherOffset, otherLength); + return PrimitiveComparator.UNSIGNED_LEXICOGRAPHICAL_BINARY_COMPARATOR.compare(this, other); } @Override @@ -345,20 +338,10 @@ abstract public class Binary implements Comparable<Binary>, Serializable { @Override public int compareTo(Binary other) { - return other.compareTo(value, 0, value.length); + return PrimitiveComparator.UNSIGNED_LEXICOGRAPHICAL_BINARY_COMPARATOR.compare(this, other); } - @Override - int compareTo(byte[] other, int otherOffset, int otherLength) { - return Binary.compareTwoByteArrays(value, 0, value.length, other, otherOffset, otherLength); - } - - @Override - int compareTo(ByteBuffer bytes, int otherOffset, int otherLength) { - return Binary.compareByteArrayToByteBuffer(value, 0, value.length, bytes, otherOffset, otherLength); - } - - @Override + @Override public ByteBuffer toByteBuffer() { return ByteBuffer.wrap(value); } @@ -505,31 +488,12 @@ abstract public class Binary implements Comparable<Binary>, Serializable { @Override public int compareTo(Binary other) { - if (value.hasArray()) { - return other.compareTo(value.array(), value.arrayOffset() + offset, length); - } else { - return other.compareTo(value, offset, length); - } - } - - @Override - int compareTo(byte[] other, int otherOffset, int otherLength) { - if (value.hasArray()) { - return Binary.compareTwoByteArrays(value.array(), value.arrayOffset() + offset, length, - other, otherOffset, otherLength); - } { - return Binary.compareByteBufferToByteArray(value, offset, length, other, otherOffset, otherLength); - } - } - - @Override - int compareTo(ByteBuffer bytes, int otherOffset, int otherLength) { - return Binary.compareTwoByteBuffers(value, offset, length, bytes, otherOffset, otherLength); + return PrimitiveComparator.UNSIGNED_LEXICOGRAPHICAL_BINARY_COMPARATOR.compare(this, other); } @Override public ByteBuffer toByteBuffer() { - ByteBuffer ret = value.slice(); + ByteBuffer ret = value.duplicate(); ret.position(offset); ret.limit(offset + length); return ret; @@ -665,64 +629,4 @@ abstract public class Binary implements Comparable<Binary>, Serializable { } return true; } - - private static final int compareByteBufferToByteArray(ByteBuffer buf, int offset1, int length1, - byte[] array, int offset2, int length2) { - return -1 * Binary.compareByteArrayToByteBuffer(array, offset1, length1, buf, offset2, length2); - } - - private static final int compareByteArrayToByteBuffer(byte[] array1, int offset1, int length1, - ByteBuffer buf, int offset2, int length2) { - if (array1 == null && buf == null) return 0; - int min_length = (length1 < length2) ? length1 : length2; - for (int i = 0; i < min_length; i++) { - if (array1[i + offset1] < buf.get(i + offset2)) { - return 1; - } - if (array1[i + offset1] > buf.get(i + offset2)) { - return -1; - } - } - // check remainder - if (length1 == length2) { return 0; } - else if (length1 < length2) { return 1;} - else { return -1; } - } - - private static final int compareTwoByteBuffers(ByteBuffer buf1, int offset1, int length1, - ByteBuffer buf2, int offset2, int length2) { - if (buf1 == null && buf2 == null) return 0; - int min_length = (length1 < length2) ? length1 : length2; - for (int i = 0; i < min_length; i++) { - if (buf1.get(i + offset1) < buf2.get(i + offset2)) { - return 1; - } - if (buf1.get(i + offset1) > buf2.get(i + offset2)) { - return -1; - } - } - // check remainder - if (length1 == length2) { return 0; } - else if (length1 < length2) { return 1;} - else { return -1; } - } - - private static final int compareTwoByteArrays(byte[] array1, int offset1, int length1, - byte[] array2, int offset2, int length2) { - if (array1 == null && array2 == null) return 0; - if (array1 == array2 && offset1 == offset2 && length1 == length2) return 0; - int min_length = (length1 < length2) ? length1 : length2; - for (int i = 0; i < min_length; i++) { - if (array1[i + offset1] < array2[i + offset2]) { - return 1; - } - if (array1[i + offset1] > array2[i + offset2]) { - return -1; - } - } - // check remainder - if (length1 == length2) { return 0; } - else if (length1 < length2) { return 1;} - else { return -1; } - } } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/c6764c4a/parquet-column/src/main/java/org/apache/parquet/schema/ColumnOrder.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/ColumnOrder.java b/parquet-column/src/main/java/org/apache/parquet/schema/ColumnOrder.java new file mode 100644 index 0000000..144a93a --- /dev/null +++ b/parquet-column/src/main/java/org/apache/parquet/schema/ColumnOrder.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.schema; + +import org.apache.parquet.Preconditions; + +/** + * Class representing the column order with all the related parameters. + */ +public class ColumnOrder { + /** + * The enum type of the column order. + */ + public enum ColumnOrderName { + /** + * Representing the case when the defined column order is undefined (e.g. the file is written by a later API and the + * current one does not support the related column order). No statistics will be written/read in this case. + */ + UNDEFINED, + /** + * Type defined order meaning that the comparison order of the elements are based on its type. + */ + TYPE_DEFINED_ORDER + } + + private static final ColumnOrder UNDEFINED_COLUMN_ORDER = new ColumnOrder(ColumnOrderName.UNDEFINED); + private static final ColumnOrder TYPE_DEFINED_COLUMN_ORDER = new ColumnOrder(ColumnOrderName.TYPE_DEFINED_ORDER); + + /** + * @return a {@link ColumnOrder} instance representing an undefined order + * @see ColumnOrderName#UNDEFINED + */ + public static ColumnOrder undefined() { + return UNDEFINED_COLUMN_ORDER; + } + + /** + * @return a {@link ColumnOrder} instance representing a type defined order + * @see ColumnOrderName#TYPE_DEFINED_ORDER + */ + public static ColumnOrder typeDefined() { + return TYPE_DEFINED_COLUMN_ORDER; + } + + private final ColumnOrderName columnOrderName; + + private ColumnOrder(ColumnOrderName columnOrderName) { + this.columnOrderName = Preconditions.checkNotNull(columnOrderName, "columnOrderName"); + } + + public ColumnOrderName getColumnOrderName() { + return columnOrderName; + } + + /** + * {@inheritDoc} + */ + @Override + public boolean equals(Object obj) { + if (obj instanceof ColumnOrder) { + return columnOrderName == ((ColumnOrder) obj).columnOrderName; + } + return false; + } + + /** + * {@inheritDoc} + */ + @Override + public int hashCode() { + return columnOrderName.hashCode(); + } + + /** + * {@inheritDoc} + */ + @Override + public String toString() { + return columnOrderName.toString(); + } +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/c6764c4a/parquet-column/src/main/java/org/apache/parquet/schema/MessageType.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/MessageType.java b/parquet-column/src/main/java/org/apache/parquet/schema/MessageType.java index 1e26ed2..afbc416 100644 --- a/parquet-column/src/main/java/org/apache/parquet/schema/MessageType.java +++ b/parquet-column/src/main/java/org/apache/parquet/schema/MessageType.java @@ -95,8 +95,7 @@ public final class MessageType extends GroupType { int maxRep = getMaxRepetitionLevel(path); int maxDef = getMaxDefinitionLevel(path); PrimitiveType type = getType(path).asPrimitiveType(); - return new ColumnDescriptor(path, type.getPrimitiveTypeName(), - type.getTypeLength(), maxRep, maxDef); + return new ColumnDescriptor(path, type, maxRep, maxDef); } public List<String[]> getPaths() { @@ -111,8 +110,7 @@ public final class MessageType extends GroupType { PrimitiveType primitiveType = getType(path).asPrimitiveType(); columns.add(new ColumnDescriptor( path, - primitiveType.getPrimitiveTypeName(), - primitiveType.getTypeLength(), + primitiveType, getMaxRepetitionLevel(path), getMaxDefinitionLevel(path))); } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/c6764c4a/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveComparator.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveComparator.java b/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveComparator.java new file mode 100644 index 0000000..085a67a --- /dev/null +++ b/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveComparator.java @@ -0,0 +1,290 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.schema; + +import org.apache.parquet.io.api.Binary; + +import java.nio.ByteBuffer; +import java.util.Comparator; + +/** + * {@link Comparator} implementation that also supports the comparison of the related primitive type to avoid the + * performance penalty of boxing/unboxing. The {@code compare} methods for the not supported primitive types throw + * {@link UnsupportedOperationException}. + */ +public abstract class PrimitiveComparator<T> implements Comparator<T> { + + public int compare(boolean b1, boolean b2) { + throw new UnsupportedOperationException( + "compare(boolean, boolean) was called on a non-boolean comparator: " + toString()); + } + + public int compare(int i1, int i2) { + throw new UnsupportedOperationException("compare(int, int) was called on a non-int comparator: " + toString()); + } + + public int compare(long l1, long l2) { + throw new UnsupportedOperationException("compare(long, long) was called on a non-long comparator: " + toString()); + } + + public int compare(float f1, float f2) { + throw new UnsupportedOperationException( + "compare(float, float) was called on a non-float comparator: " + toString()); + } + + public int compare(double d1, double d2) { + throw new UnsupportedOperationException( + "compare(double, double) was called on a non-double comparator: " + toString()); + } + + @Override + public final int compare(T o1, T o2) { + if (o1 == null) { + return o2 == null ? 0 : -1; + } + return o2 == null ? 1 : compareNotNulls(o1, o2); + } + + abstract int compareNotNulls(T o1, T o2); + + static final PrimitiveComparator<Boolean> BOOLEAN_COMPARATOR = new PrimitiveComparator<Boolean>() { + @Override + int compareNotNulls(Boolean o1, Boolean o2) { + return compare(o1.booleanValue(), o2.booleanValue()); + } + + @Override + public int compare(boolean b1, boolean b2) { + return Boolean.compare(b1, b2); + } + + @Override + public String toString() { + return "BOOLEAN_COMPARATOR"; + } + }; + + private static abstract class IntComparator extends PrimitiveComparator<Integer> { + @Override + int compareNotNulls(Integer o1, Integer o2) { + return compare(o1.intValue(), o2.intValue()); + } + } + + static final PrimitiveComparator<Integer> SIGNED_INT32_COMPARATOR = new IntComparator() { + @Override + public int compare(int i1, int i2) { + return Integer.compare(i1, i2); + } + + @Override + public String toString() { + return "SIGNED_INT32_COMPARATOR"; + } + }; + + static final PrimitiveComparator<Integer> UNSIGNED_INT32_COMPARATOR = new IntComparator() { + @Override + public int compare(int i1, int i2) { + // Implemented based on com.google.common.primitives.UnsignedInts.compare(int, int) + return Integer.compare(i1 ^ Integer.MIN_VALUE, i2 ^ Integer.MIN_VALUE); + } + + @Override + public String toString() { + return "UNSIGNED_INT32_COMPARATOR"; + } + }; + + private static abstract class LongComparator extends PrimitiveComparator<Long> { + @Override + int compareNotNulls(Long o1, Long o2) { + return compare(o1.longValue(), o2.longValue()); + } + } + + static final PrimitiveComparator<Long> SIGNED_INT64_COMPARATOR = new LongComparator() { + @Override + public int compare(long l1, long l2) { + return Long.compare(l1, l2); + } + + @Override + public String toString() { + return "SIGNED_INT64_COMPARATOR"; + } + }; + + static final PrimitiveComparator<Long> UNSIGNED_INT64_COMPARATOR = new LongComparator() { + @Override + public int compare(long l1, long l2) { + // Implemented based on com.google.common.primitives.UnsignedLongs.compare(long, long) + return Long.compare(l1 ^ Long.MIN_VALUE, l2 ^ Long.MIN_VALUE); + } + + @Override + public String toString() { + return "UNSIGNED_INT64_COMPARATOR"; + } + }; + + static final PrimitiveComparator<Float> FLOAT_COMPARATOR = new PrimitiveComparator<Float>() { + @Override + int compareNotNulls(Float o1, Float o2) { + return compare(o1.floatValue(), o2.floatValue()); + } + + @Override + public int compare(float f1, float f2) { + return Float.compare(f1, f2); + } + + @Override + public String toString() { + return "FLOAT_COMPARATOR"; + } + }; + + static final PrimitiveComparator<Double> DOUBLE_COMPARATOR = new PrimitiveComparator<Double>() { + @Override + int compareNotNulls(Double o1, Double o2) { + return compare(o1.doubleValue(), o2.doubleValue()); + } + + @Override + public int compare(double d1, double d2) { + return Double.compare(d1, d2); + } + + @Override + public String toString() { + return "DOUBLE_COMPARATOR"; + } + }; + + private static abstract class BinaryComparator extends PrimitiveComparator<Binary> { + @Override + int compareNotNulls(Binary o1, Binary o2) { + return compare(o1.toByteBuffer(), o2.toByteBuffer()); + } + + abstract int compare(ByteBuffer b1, ByteBuffer b2); + + final int toUnsigned(byte b) { + return b & 0xFF; + } + } + + public static final PrimitiveComparator<Binary> UNSIGNED_LEXICOGRAPHICAL_BINARY_COMPARATOR = new BinaryComparator() { + @Override + int compare(ByteBuffer b1, ByteBuffer b2) { + int l1 = b1.remaining(); + int l2 = b2.remaining(); + int p1 = b1.position(); + int p2 = b2.position(); + int minL = Math.min(l1, l2); + + for (int i = 0; i < minL; ++i) { + int result = unsignedCompare(b1.get(p1 + i), b2.get(p2 + i)); + if (result != 0) { + return result; + } + } + + return l1 - l2; + } + + private int unsignedCompare(byte b1, byte b2) { + return toUnsigned(b1) - toUnsigned(b2); + } + + @Override + public String toString() { + return "UNSIGNED_LEXICOGRAPHICAL_BINARY_COMPARATOR"; + } + }; + + /* + * This comparator is for comparing two signed decimal values represented in twos-complement binary. In case of the + * binary length of one value is shorter than the other it will be padded by the corresponding prefix (0xFF for + * negative, 0x00 for positive values). + */ + static final PrimitiveComparator<Binary> BINARY_AS_SIGNED_INTEGER_COMPARATOR = new BinaryComparator() { + private static final int NEGATIVE_PADDING = 0xFF; + private static final int POSITIVE_PADDING = 0; + + @Override + int compare(ByteBuffer b1, ByteBuffer b2) { + int l1 = b1.remaining(); + int l2 = b2.remaining(); + int p1 = b1.position(); + int p2 = b2.position(); + + boolean isNegative1 = l1 > 0 ? b1.get(p1) < 0 : false; + boolean isNegative2 = l2 > 0 ? b2.get(p2) < 0 : false; + if (isNegative1 != isNegative2) { + return isNegative1 ? -1 : 1; + } + + int result = 0; + + // Compare the beginning of the longer buffer with the proper padding + if (l1 < l2) { + int lengthDiff = l2 - l1; + result = -compareWithPadding(lengthDiff, b2, p2, isNegative1 ? NEGATIVE_PADDING : POSITIVE_PADDING); + p2 += lengthDiff; + } else if (l1 > l2) { + int lengthDiff = l1 - l2; + result = compareWithPadding(lengthDiff, b1, p1, isNegative2 ? NEGATIVE_PADDING : POSITIVE_PADDING); + p1 += lengthDiff; + } + + // The beginning of the longer buffer equals to the padding or the lengths are equal + if (result == 0) { + result = compare(l1, b1, p1, b2, p2); + } + return result; + } + + private int compareWithPadding(int length, ByteBuffer b, int p, int paddingByte) { + for (int i = p, n = p + length; i < n; ++i) { + int result = toUnsigned(b.get(i)) - paddingByte; + if (result != 0) { + return result; + } + } + return 0; + } + + private int compare(int length, ByteBuffer b1, int p1, ByteBuffer b2, int p2) { + for (int i = 0; i < length; ++i) { + int result = toUnsigned(b1.get(p1 + i)) - toUnsigned(b2.get(p2 + i)); + if (result != 0) { + return result; + } + } + return 0; + } + + @Override + public String toString() { + return "BINARY_AS_SIGNED_INTEGER_COMPARATOR"; + } + }; +}