This is an automated email from the ASF dual-hosted git repository.

volodymyr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 005f4d9ca4cec0646ce2dd15399603a57a86460d
Author: Paul Rogers <[email protected]>
AuthorDate: Tue Dec 10 20:13:30 2019 -0800

    DRILL-7479: Partial fixes for metadata parameterized type issues
    
    See DRILL-7479 and DRILL-7480 for an explanation. Adds generic
    type parameters where needed to avoid the need to supporess
    warnings. However, type parameters are probably not needed
    at all and should be removed in the future for reasons explained
    in DRILL-7480.
    
    closes #1923
---
 .../drill/exec/expr/ComparisonPredicate.java       | 14 +++---
 .../org/apache/drill/exec/expr/IsPredicate.java    |  9 ++--
 .../apache/drill/exec/expr/StatisticsProvider.java | 18 ++++----
 .../MetastoreParquetTableMetadataProvider.java     |  8 ++--
 .../metastore/SimpleFileTableMetadataProvider.java |  8 ++--
 .../base/AbstractGroupScanWithMetadata.java        | 10 ++---
 .../impl/metadata/MetadataControllerBatch.java     | 50 ++++++++++------------
 .../impl/metadata/MetadataHandlerBatch.java        |  7 ++-
 .../drill/exec/planner/common/DrillStatsTable.java | 12 +++---
 .../planner/cost/DrillRelMdDistinctRowCount.java   |  2 +-
 .../exec/planner/cost/DrillRelMdSelectivity.java   |  9 ++--
 .../store/parquet/AbstractParquetGroupScan.java    |  4 +-
 .../parquet/AbstractParquetScanBatchCreator.java   |  6 +--
 .../store/parquet/BaseParquetMetadataProvider.java | 17 ++++----
 .../exec/store/parquet/FilterEvaluatorUtils.java   | 13 +++---
 .../drill/exec/store/parquet/ParquetGroupScan.java |  9 ++--
 .../store/parquet/ParquetGroupScanStatistics.java  | 13 +++---
 .../exec/store/parquet/ParquetPushDownFilter.java  |  8 ++--
 .../store/parquet/ParquetTableMetadataUtils.java   | 31 +++++++-------
 .../exec/sql/TestInfoSchemaWithMetastore.java      | 10 ++---
 .../drill/exec/sql/TestMetastoreCommands.java      | 44 ++++++++++---------
 .../exec/store/parquet/TestFileGenerator.java      | 29 -------------
 .../store/parquet/TestParquetFilterPushDown.java   | 24 +++++------
 .../components/tables/BasicTablesTransformer.java  |  3 ++
 .../components/tables/TableMetadataUnit.java       |  2 +
 .../metastore/exceptions/MetastoreException.java   |  6 ++-
 .../drill/metastore/metadata/BaseMetadata.java     | 28 ++++++------
 .../metastore/metadata/BaseTableMetadata.java      | 12 +++---
 .../apache/drill/metastore/metadata/Metadata.java  |  6 +--
 .../metadata/NonInterestingColumnsMetadata.java    | 13 +++---
 .../drill/metastore/metadata/TableMetadata.java    |  3 +-
 .../CollectableColumnStatisticsKind.java           |  2 +-
 .../metastore/statistics/ColumnStatistics.java     | 30 +++++++------
 .../metastore/statistics/ColumnStatisticsKind.java | 31 ++++++++------
 .../metastore/statistics/StatisticsHolder.java     | 12 +++---
 .../drill/metastore/util/TableMetadataUtils.java   | 44 ++++++++++++-------
 .../tables/TestTableMetadataUnitConversion.java    |  4 +-
 .../metastore/metadata/MetadataSerDeTest.java      | 16 +++----
 38 files changed, 287 insertions(+), 280 deletions(-)

diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ComparisonPredicate.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ComparisonPredicate.java
index fa51780..3594a6d 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ComparisonPredicate.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ComparisonPredicate.java
@@ -88,11 +88,11 @@ public class ComparisonPredicate<C extends Comparable<C>> 
extends LogicalExpress
   @Override
   @SuppressWarnings("unchecked")
   public RowsMatch matches(StatisticsProvider<C> evaluator) {
-    ColumnStatistics leftStat = left.accept(evaluator, null);
+    ColumnStatistics<C> leftStat = (ColumnStatistics<C>) 
left.accept(evaluator, null);
     if (IsPredicate.isNullOrEmpty(leftStat)) {
       return RowsMatch.SOME;
     }
-    ColumnStatistics rightStat = right.accept(evaluator, null);
+    ColumnStatistics<C> rightStat = (ColumnStatistics<C>) 
right.accept(evaluator, null);
     if (IsPredicate.isNullOrEmpty(rightStat)) {
       return RowsMatch.SOME;
     }
@@ -112,9 +112,11 @@ public class ComparisonPredicate<C extends Comparable<C>> 
extends LogicalExpress
       int leftScale = left.getMajorType().getScale();
       int rightScale = right.getMajorType().getScale();
       if (leftScale > rightScale) {
-        rightStat = adjustDecimalStatistics(rightStat, leftScale - rightScale);
+        rightStat = (ColumnStatistics<C>) adjustDecimalStatistics(
+            (ColumnStatistics<BigInteger>) rightStat, leftScale - rightScale);
       } else if (leftScale < rightScale) {
-        leftStat = adjustDecimalStatistics(leftStat, rightScale - leftScale);
+        leftStat = (ColumnStatistics<C>) adjustDecimalStatistics(
+            (ColumnStatistics<BigInteger>) leftStat, rightScale - leftScale);
       }
     }
     return predicate.apply(leftStat, rightStat);
@@ -127,7 +129,7 @@ public class ComparisonPredicate<C extends Comparable<C>> 
extends LogicalExpress
    * @param scale adjustment scale
    * @return adjusted statistics
    */
-  private ColumnStatistics 
adjustDecimalStatistics(ColumnStatistics<BigInteger> statistics, int scale) {
+  private ColumnStatistics<BigInteger> 
adjustDecimalStatistics(ColumnStatistics<BigInteger> statistics, int scale) {
     BigInteger min = new 
BigDecimal(ColumnStatisticsKind.MIN_VALUE.getValueStatistic(statistics))
         .setScale(scale, RoundingMode.HALF_UP).unscaledValue();
     BigInteger max = new 
BigDecimal(ColumnStatisticsKind.MAX_VALUE.getValueStatistic(statistics))
@@ -139,7 +141,7 @@ public class ComparisonPredicate<C extends Comparable<C>> 
extends LogicalExpress
   /**
    * If one rowgroup contains some null values, change the RowsMatch.ALL into 
RowsMatch.SOME (null values should be discarded by filter)
    */
-  private static RowsMatch checkNull(ColumnStatistics leftStat, 
ColumnStatistics rightStat) {
+  private static RowsMatch checkNull(ColumnStatistics<?> leftStat, 
ColumnStatistics<?> rightStat) {
     return !IsPredicate.hasNoNulls(leftStat) || 
!IsPredicate.hasNoNulls(rightStat) ? RowsMatch.SOME : RowsMatch.ALL;
   }
 
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/IsPredicate.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/IsPredicate.java
index aa0b9fd..def1d3c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/IsPredicate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/IsPredicate.java
@@ -63,7 +63,8 @@ public class IsPredicate<C extends Comparable<C>> extends 
LogicalExpressionBase
    */
   @Override
   public RowsMatch matches(StatisticsProvider<C> evaluator) {
-    ColumnStatistics<C> exprStat = expr.accept(evaluator, null);
+    @SuppressWarnings("unchecked")
+    ColumnStatistics<C> exprStat = (ColumnStatistics<C>) 
expr.accept(evaluator, null);
     return isNullOrEmpty(exprStat) ? RowsMatch.SOME : 
predicate.apply(exprStat, evaluator);
   }
 
@@ -71,7 +72,7 @@ public class IsPredicate<C extends Comparable<C>> extends 
LogicalExpressionBase
    * @param stat statistics object
    * @return <tt>true</tt> if the input stat object is null or has invalid 
statistics; false otherwise
    */
-  public static boolean isNullOrEmpty(ColumnStatistics stat) {
+  public static boolean isNullOrEmpty(ColumnStatistics<?> stat) {
     return stat == null
         || !stat.contains(ColumnStatisticsKind.MIN_VALUE)
         || !stat.contains(ColumnStatisticsKind.MAX_VALUE)
@@ -85,7 +86,7 @@ public class IsPredicate<C extends Comparable<C>> extends 
LogicalExpressionBase
    * If it contains some null values, then we change the RowsMatch.ALL into 
RowsMatch.SOME, which sya that maybe
    * some values (the null ones) should be disgarded.
    */
-  private static RowsMatch checkNull(ColumnStatistics exprStat) {
+  private static RowsMatch checkNull(ColumnStatistics<?> exprStat) {
     return hasNoNulls(exprStat) ? RowsMatch.ALL : RowsMatch.SOME;
   }
 
@@ -95,7 +96,7 @@ public class IsPredicate<C extends Comparable<C>> extends 
LogicalExpressionBase
    * @param stat column statistics
    * @return <tt>true</tt> if the statistics does not have nulls and 
<tt>false</tt> otherwise
    */
-  static boolean hasNoNulls(ColumnStatistics stat) {
+  static boolean hasNoNulls(ColumnStatistics<?> stat) {
     return ColumnStatisticsKind.NULLS_COUNT.getFrom(stat) == 0;
   }
 
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/StatisticsProvider.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/StatisticsProvider.java
index 5b3cfc8..1ab7579 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/StatisticsProvider.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/StatisticsProvider.java
@@ -48,12 +48,12 @@ import java.util.EnumSet;
 import java.util.Map;
 import java.util.Set;
 
-public class StatisticsProvider<T extends Comparable<T>> extends 
AbstractExprVisitor<ColumnStatistics, Void, RuntimeException> {
+public class StatisticsProvider<T extends Comparable<T>> extends 
AbstractExprVisitor<ColumnStatistics<?>, Void, RuntimeException> {
 
-  private final Map<SchemaPath, ColumnStatistics> columnStatMap;
+  private final Map<SchemaPath, ColumnStatistics<?>> columnStatMap;
   private final long rowCount;
 
-  public StatisticsProvider(Map<SchemaPath, ColumnStatistics> columnStatMap, 
long rowCount) {
+  public StatisticsProvider(Map<SchemaPath, ColumnStatistics<?>> 
columnStatMap, long rowCount) {
     this.columnStatMap = columnStatMap;
     this.rowCount = rowCount;
   }
@@ -63,14 +63,14 @@ public class StatisticsProvider<T extends Comparable<T>> 
extends AbstractExprVis
   }
 
   @Override
-  public ColumnStatistics visitUnknown(LogicalExpression e, Void value) {
+  public ColumnStatistics<?> visitUnknown(LogicalExpression e, Void value) {
     // do nothing for the unknown expression
     return null;
   }
 
   @Override
-  public ColumnStatistics visitTypedFieldExpr(TypedFieldExpr typedFieldExpr, 
Void value) {
-    ColumnStatistics columnStatistics = 
columnStatMap.get(typedFieldExpr.getPath().getUnIndexed());
+  public ColumnStatistics<?> visitTypedFieldExpr(TypedFieldExpr 
typedFieldExpr, Void value) {
+    ColumnStatistics<?> columnStatistics = 
columnStatMap.get(typedFieldExpr.getPath().getUnIndexed());
     if (columnStatistics != null) {
       return columnStatistics;
     } else if (typedFieldExpr.getMajorType().equals(Types.OPTIONAL_INT)) {
@@ -132,7 +132,7 @@ public class StatisticsProvider<T extends Comparable<T>> 
extends AbstractExprVis
 
   @Override
   @SuppressWarnings("unchecked")
-  public ColumnStatistics 
visitFunctionHolderExpression(FunctionHolderExpression holderExpr, Void value) {
+  public ColumnStatistics<?> 
visitFunctionHolderExpression(FunctionHolderExpression holderExpr, Void value) {
     FuncHolder funcHolder = holderExpr.getHolder();
 
     if (!(funcHolder instanceof DrillSimpleFuncHolder)) {
@@ -143,7 +143,7 @@ public class StatisticsProvider<T extends Comparable<T>> 
extends AbstractExprVis
     String funcName = ((DrillSimpleFuncHolder) 
funcHolder).getRegisteredNames()[0];
 
     if (FunctionReplacementUtils.isCastFunction(funcName)) {
-      ColumnStatistics<T> stat = holderExpr.args.get(0).accept(this, null);
+      ColumnStatistics<T> stat = (ColumnStatistics<T>) 
holderExpr.args.get(0).accept(this, null);
       if (!IsPredicate.isNullOrEmpty(stat)) {
         return evalCastFunc(holderExpr, stat);
       }
@@ -151,7 +151,7 @@ public class StatisticsProvider<T extends Comparable<T>> 
extends AbstractExprVis
     return null;
   }
 
-  private ColumnStatistics evalCastFunc(FunctionHolderExpression holderExpr, 
ColumnStatistics<T> input) {
+  private ColumnStatistics<?> evalCastFunc(FunctionHolderExpression 
holderExpr, ColumnStatistics<T> input) {
     try {
       DrillSimpleFuncHolder funcHolder = (DrillSimpleFuncHolder) 
holderExpr.getHolder();
 
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/metastore/MetastoreParquetTableMetadataProvider.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/metastore/MetastoreParquetTableMetadataProvider.java
index 24736bf..acad916 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/metastore/MetastoreParquetTableMetadataProvider.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/metastore/MetastoreParquetTableMetadataProvider.java
@@ -85,7 +85,7 @@ public class MetastoreParquetTableMetadataProvider implements 
ParquetTableMetada
   private Multimap<Path, RowGroupMetadata> rowGroups;
   private NonInterestingColumnsMetadata nonInterestingColumnsMetadata;
   // stores builder to provide lazy init for fallback 
ParquetTableMetadataProvider
-  private ParquetFileTableMetadataProviderBuilder fallbackBuilder;
+  private final ParquetFileTableMetadataProviderBuilder fallbackBuilder;
   private ParquetTableMetadataProvider fallback;
 
   private MetastoreParquetTableMetadataProvider(List<ReadEntryWithPath> 
entries,
@@ -259,12 +259,12 @@ public class MetastoreParquetTableMetadataProvider 
implements ParquetTableMetada
     if (nonInterestingColumnsMetadata == null) {
       TupleMetadata schema = getTableMetadata().getSchema();
 
-      List<StatisticsHolder> statistics = Collections.singletonList(new 
StatisticsHolder<>(Statistic.NO_COLUMN_STATS, 
ColumnStatisticsKind.NULLS_COUNT));
+      List<StatisticsHolder<?>> statistics = Collections.singletonList(new 
StatisticsHolder<>(Statistic.NO_COLUMN_STATS, 
ColumnStatisticsKind.NULLS_COUNT));
 
       List<SchemaPath> columnPaths = SchemaUtil.getSchemaPaths(schema);
       List<SchemaPath> interestingColumns = getInterestingColumns(columnPaths);
       // populates statistics for non-interesting columns and columns for 
which statistics wasn't collected
-      Map<SchemaPath, ColumnStatistics> columnsStatistics = 
columnPaths.stream()
+      Map<SchemaPath, ColumnStatistics<?>> columnsStatistics = 
columnPaths.stream()
           .filter(schemaPath -> !interestingColumns.contains(schemaPath)
               || SchemaPathUtils.getColumnMetadata(schemaPath, 
schema).isArray())
           .collect(Collectors.toMap(
@@ -315,7 +315,7 @@ public class MetastoreParquetTableMetadataProvider 
implements ParquetTableMetada
 
     // builder for fallback ParquetFileTableMetadataProvider
     // for the case when required metadata is absent in Metastore
-    private ParquetFileTableMetadataProviderBuilder fallback;
+    private final ParquetFileTableMetadataProviderBuilder fallback;
 
     public Builder(MetastoreMetadataProviderManager source) {
       this.metadataProviderManager = source;
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/metastore/SimpleFileTableMetadataProvider.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/metastore/SimpleFileTableMetadataProvider.java
index b3a34658..1ae99e4 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/metastore/SimpleFileTableMetadataProvider.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/metastore/SimpleFileTableMetadataProvider.java
@@ -49,7 +49,7 @@ import java.util.Map;
 public class SimpleFileTableMetadataProvider implements TableMetadataProvider {
   private static final Logger logger = 
LoggerFactory.getLogger(SimpleFileTableMetadataProvider.class);
 
-  private TableMetadata tableMetadata;
+  private final TableMetadata tableMetadata;
 
   private SimpleFileTableMetadataProvider(TableMetadata tableMetadata) {
     this.tableMetadata = tableMetadata;
@@ -111,7 +111,7 @@ public class SimpleFileTableMetadataProvider implements 
TableMetadataProvider {
     private long lastModifiedTime = -1L;
     private TupleMetadata schema;
 
-    private MetadataProviderManager metadataProviderManager;
+    private final MetadataProviderManager metadataProviderManager;
 
     public Builder(MetadataProviderManager source) {
       this.metadataProviderManager = source;
@@ -147,7 +147,7 @@ public class SimpleFileTableMetadataProvider implements 
TableMetadataProvider {
       TableMetadataProvider source = 
metadataProviderManager.getTableMetadataProvider();
       if (source == null) {
         DrillStatsTable statsProvider = 
metadataProviderManager.getStatsProvider();
-        Map<SchemaPath, ColumnStatistics> columnsStatistics = new HashMap<>();
+        Map<SchemaPath, ColumnStatistics<?>> columnsStatistics = new 
HashMap<>();
 
         if (statsProvider != null) {
           if (!statsProvider.isMaterialized()) {
@@ -156,7 +156,7 @@ public class SimpleFileTableMetadataProvider implements 
TableMetadataProvider {
           if (statsProvider.isMaterialized()) {
             for (SchemaPath column : statsProvider.getColumns()) {
               columnsStatistics.put(column,
-                  new 
ColumnStatistics(DrillStatsTable.getEstimatedColumnStats(statsProvider, 
column)));
+                  new 
ColumnStatistics<>(DrillStatsTable.getEstimatedColumnStats(statsProvider, 
column)));
             }
           }
         }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScanWithMetadata.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScanWithMetadata.java
index 2371c6d..40ab594 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScanWithMetadata.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScanWithMetadata.java
@@ -261,7 +261,7 @@ public abstract class AbstractGroupScanWithMetadata<P 
extends TableMetadataProvi
    * @return group scan with applied filter expression
    */
   @Override
-  public AbstractGroupScanWithMetadata applyFilter(LogicalExpression 
filterExpr, UdfUtilities udfUtilities,
+  public AbstractGroupScanWithMetadata<?> applyFilter(LogicalExpression 
filterExpr, UdfUtilities udfUtilities,
       FunctionImplementationRegistry functionImplementationRegistry, 
OptionManager optionManager) {
 
     // Builds filter for pruning. If filter cannot be built, null should be 
returned.
@@ -531,7 +531,6 @@ public abstract class AbstractGroupScanWithMetadata<P 
extends TableMetadataProvi
     return columnMetadata != null ? columnMetadata.majorType() : null;
   }
 
-  @SuppressWarnings("unchecked")
   @JsonIgnore
   public <T> T getPartitionValue(Path path, SchemaPath column, Class<T> clazz) 
{
     return getPartitionsMetadata().stream()
@@ -642,7 +641,7 @@ public abstract class AbstractGroupScanWithMetadata<P 
extends TableMetadataProvi
     // and files which belongs to that partitions may be returned
     protected MetadataType overflowLevel = MetadataType.NONE;
 
-    public GroupScanWithMetadataFilterer(AbstractGroupScanWithMetadata source) 
{
+    public GroupScanWithMetadataFilterer(AbstractGroupScanWithMetadata<?> 
source) {
       this.source = source;
     }
 
@@ -651,7 +650,7 @@ public abstract class AbstractGroupScanWithMetadata<P 
extends TableMetadataProvi
      *
      * @return implementation of {@link AbstractGroupScanWithMetadata} with 
filtered metadata
      */
-    public abstract AbstractGroupScanWithMetadata build();
+    public abstract AbstractGroupScanWithMetadata<?> build();
 
     public B table(TableMetadata tableMetadata) {
       this.tableMetadata = tableMetadata;
@@ -968,8 +967,7 @@ public abstract class AbstractGroupScanWithMetadata<P 
extends TableMetadataProvi
           filterPredicate = getFilterPredicate(filterExpression, udfUtilities,
               context, optionManager, true, true, schema);
         }
-        @SuppressWarnings("rawtypes")
-        Map<SchemaPath, ColumnStatistics> columnsStatistics = 
metadata.getColumnsStatistics();
+        Map<SchemaPath, ColumnStatistics<?>> columnsStatistics = 
metadata.getColumnsStatistics();
 
         // adds partition (dir) column statistics if it may be used during 
filter evaluation
         if (metadata instanceof LocationProvider && optionManager != null) {
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataControllerBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataControllerBatch.java
index 7ef4741..73a55b0 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataControllerBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataControllerBatch.java
@@ -363,7 +363,6 @@ public class MetadataControllerBatch extends 
AbstractBinaryRecordBatch<MetadataC
         .build();
   }
 
-  @SuppressWarnings("rawtypes")
   private List<TableMetadataUnit> getMetadataUnits(TupleReader reader, int 
nestingLevel) {
     List<TableMetadataUnit> metadataUnits = new ArrayList<>();
 
@@ -383,7 +382,7 @@ public class MetadataControllerBatch extends 
AbstractBinaryRecordBatch<MetadataC
       }
     }
 
-    List<StatisticsHolder> metadataStatistics = getMetadataStatistics(reader, 
columnMetadata);
+    List<StatisticsHolder<?>> metadataStatistics = 
getMetadataStatistics(reader, columnMetadata);
 
     Long rowCount = (Long) metadataStatistics.stream()
         .filter(statisticsHolder -> statisticsHolder.getStatisticsKind() == 
TableStatisticsKind.ROW_COUNT)
@@ -391,7 +390,7 @@ public class MetadataControllerBatch extends 
AbstractBinaryRecordBatch<MetadataC
         .map(StatisticsHolder::getStatisticsValue)
         .orElse(null);
 
-    Map<SchemaPath, ColumnStatistics> columnStatistics = 
getColumnStatistics(reader, columnMetadata, rowCount);
+    Map<SchemaPath, ColumnStatistics<?>> columnStatistics = 
getColumnStatistics(reader, columnMetadata, rowCount);
 
     MetadataType metadataType = 
MetadataType.valueOf(metadataColumnReader.scalar().getString());
 
@@ -426,9 +425,8 @@ public class MetadataControllerBatch extends 
AbstractBinaryRecordBatch<MetadataC
     return metadataUnits;
   }
 
-  @SuppressWarnings("rawtypes")
-  private PartitionMetadata getPartitionMetadata(TupleReader reader, 
List<StatisticsHolder> metadataStatistics,
-      Map<SchemaPath, ColumnStatistics> columnStatistics, int nestingLevel) {
+  private PartitionMetadata getPartitionMetadata(TupleReader reader, 
List<StatisticsHolder<?>> metadataStatistics,
+      Map<SchemaPath, ColumnStatistics<?>> columnStatistics, int nestingLevel) 
{
     List<String> segmentColumns = popConfig.getContext().segmentColumns();
 
     String segmentKey = segmentColumns.size() > 0
@@ -458,10 +456,9 @@ public class MetadataControllerBatch extends 
AbstractBinaryRecordBatch<MetadataC
         .build();
   }
 
-  @SuppressWarnings("rawtypes")
-  private BaseTableMetadata getTableMetadata(TupleReader reader, 
List<StatisticsHolder> metadataStatistics,
-      Map<SchemaPath, ColumnStatistics> columnStatistics) {
-    List<StatisticsHolder> updatedMetaStats = new 
ArrayList<>(metadataStatistics);
+  private BaseTableMetadata getTableMetadata(TupleReader reader, 
List<StatisticsHolder<?>> metadataStatistics,
+      Map<SchemaPath, ColumnStatistics<?>> columnStatistics) {
+    List<StatisticsHolder<?>> updatedMetaStats = new 
ArrayList<>(metadataStatistics);
     updatedMetaStats.add(new 
StatisticsHolder<>(popConfig.getContext().analyzeMetadataLevel(), 
TableStatisticsKind.ANALYZE_METADATA_LEVEL));
 
     MetadataInfo metadataInfo = MetadataInfo.builder()
@@ -483,7 +480,7 @@ public class MetadataControllerBatch extends 
AbstractBinaryRecordBatch<MetadataC
 
     if (context.getOptions().getOption(PlannerSettings.STATISTICS_USE)) {
       DrillStatsTable statistics = new 
DrillStatsTable(statisticsCollector.getStatistics());
-      Map<SchemaPath, ColumnStatistics> tableColumnStatistics =
+      Map<SchemaPath, ColumnStatistics<?>> tableColumnStatistics =
           
ParquetTableMetadataUtils.getColumnStatistics(tableMetadata.getSchema(), 
statistics);
       tableMetadata = tableMetadata.cloneWithStats(tableColumnStatistics, 
DrillStatsTable.getEstimatedTableStats(statistics));
     }
@@ -491,9 +488,8 @@ public class MetadataControllerBatch extends 
AbstractBinaryRecordBatch<MetadataC
     return tableMetadata;
   }
 
-  @SuppressWarnings("rawtypes")
-  private SegmentMetadata getSegmentMetadata(TupleReader reader, 
List<StatisticsHolder> metadataStatistics,
-      Map<SchemaPath, ColumnStatistics> columnStatistics, int nestingLevel) {
+  private SegmentMetadata getSegmentMetadata(TupleReader reader, 
List<StatisticsHolder<?>> metadataStatistics,
+      Map<SchemaPath, ColumnStatistics<?>> columnStatistics, int nestingLevel) 
{
     List<String> segmentColumns = popConfig.getContext().segmentColumns();
 
     String segmentKey = segmentColumns.size() > 0
@@ -540,9 +536,8 @@ public class MetadataControllerBatch extends 
AbstractBinaryRecordBatch<MetadataC
         .build();
   }
 
-  @SuppressWarnings("rawtypes")
-  private FileMetadata getFileMetadata(TupleReader reader, 
List<StatisticsHolder> metadataStatistics,
-      Map<SchemaPath, ColumnStatistics> columnStatistics, int nestingLevel) {
+  private FileMetadata getFileMetadata(TupleReader reader, 
List<StatisticsHolder<?>> metadataStatistics,
+      Map<SchemaPath, ColumnStatistics<?>> columnStatistics, int nestingLevel) 
{
     List<String> segmentColumns = popConfig.getContext().segmentColumns();
 
     String segmentKey = segmentColumns.size() > 0
@@ -574,9 +569,8 @@ public class MetadataControllerBatch extends 
AbstractBinaryRecordBatch<MetadataC
         .build();
   }
 
-  @SuppressWarnings("rawtypes")
-  private RowGroupMetadata getRowGroupMetadata(TupleReader 
reader,List<StatisticsHolder> metadataStatistics,
-      Map<SchemaPath, ColumnStatistics> columnStatistics, int nestingLevel) {
+  private RowGroupMetadata getRowGroupMetadata(TupleReader 
reader,List<StatisticsHolder<?>> metadataStatistics,
+      Map<SchemaPath, ColumnStatistics<?>> columnStatistics, int nestingLevel) 
{
 
     List<String> segmentColumns = popConfig.getContext().segmentColumns();
     String segmentKey = segmentColumns.size() > 0
@@ -613,9 +607,8 @@ public class MetadataControllerBatch extends 
AbstractBinaryRecordBatch<MetadataC
         .build();
   }
 
-  @SuppressWarnings("rawtypes")
-  private Map<SchemaPath, ColumnStatistics> getColumnStatistics(TupleReader 
reader, TupleMetadata columnMetadata, Long rowCount) {
-    Multimap<String, StatisticsHolder> columnStatistics = 
ArrayListMultimap.create();
+  private Map<SchemaPath, ColumnStatistics<?>> getColumnStatistics(TupleReader 
reader, TupleMetadata columnMetadata, Long rowCount) {
+    Multimap<String, StatisticsHolder<?>> columnStatistics = 
ArrayListMultimap.create();
     Map<String, TypeProtos.MinorType> columnTypes = new HashMap<>();
     for (ColumnMetadata column : columnMetadata) {
       String fieldName = AnalyzeColumnUtils.getColumnName(column.name());
@@ -633,7 +626,7 @@ public class MetadataControllerBatch extends 
AbstractBinaryRecordBatch<MetadataC
 
     // adds NON_NULL_COUNT to use it during filter pushdown
     if (rowCount != null) {
-      Map<String, StatisticsHolder> nullsCountColumnStatistics = new 
HashMap<>();
+      Map<String, StatisticsHolder<?>> nullsCountColumnStatistics = new 
HashMap<>();
       columnStatistics.asMap().forEach((key, value) ->
           value.stream()
               .filter(statisticsHolder -> statisticsHolder.getStatisticsKind() 
== ColumnStatisticsKind.NON_NULL_VALUES_COUNT)
@@ -647,16 +640,15 @@ public class MetadataControllerBatch extends 
AbstractBinaryRecordBatch<MetadataC
       nullsCountColumnStatistics.forEach(columnStatistics::put);
     }
 
-    Map<SchemaPath, ColumnStatistics> resultingStats = new HashMap<>();
+    Map<SchemaPath, ColumnStatistics<?>> resultingStats = new HashMap<>();
 
     columnStatistics.asMap().forEach((fieldName, statisticsHolders) ->
         resultingStats.put(SchemaPath.parseFromString(fieldName), new 
ColumnStatistics<>(statisticsHolders, columnTypes.get(fieldName))));
     return resultingStats;
   }
 
-  @SuppressWarnings("rawtypes")
-  private List<StatisticsHolder> getMetadataStatistics(TupleReader reader, 
TupleMetadata columnMetadata) {
-    List<StatisticsHolder> metadataStatistics = new ArrayList<>();
+  private List<StatisticsHolder<?>> getMetadataStatistics(TupleReader reader, 
TupleMetadata columnMetadata) {
+    List<StatisticsHolder<?>> metadataStatistics = new ArrayList<>();
     String rgs = columnNamesOptions.rowGroupStart();
     String rgl = columnNamesOptions.rowGroupLength();
     for (ColumnMetadata column : columnMetadata) {
@@ -753,6 +745,8 @@ public class MetadataControllerBatch extends 
AbstractBinaryRecordBatch<MetadataC
       case FILE: {
         childLocations.add(new 
Path(reader.column(MetastoreAnalyzeConstants.LOCATION_FIELD).scalar().getString()));
       }
+      default:
+        break;
     }
 
     return childLocations;
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataHandlerBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataHandlerBatch.java
index 044d140..5c45150 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataHandlerBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataHandlerBatch.java
@@ -170,6 +170,8 @@ public class MetadataHandlerBatch extends 
AbstractSingleRecordBatch<MetadataHand
                   new ArrayList<>(metadataToHandle.values()));
           return populateContainer(segments);
         }
+        default:
+          break;
       }
     }
     return outcome;
@@ -194,7 +196,6 @@ public class MetadataHandlerBatch extends 
AbstractSingleRecordBatch<MetadataHand
     }
   }
 
-  @SuppressWarnings("unchecked")
   private <T extends BaseMetadata & LocationProvider> VectorContainer 
writeMetadata(List<T> metadataList) {
     BaseMetadata firstElement = metadataList.iterator().next();
 
@@ -304,7 +305,6 @@ public class MetadataHandlerBatch extends 
AbstractSingleRecordBatch<MetadataHand
     return new ResultSetLoaderImpl(container.getAllocator(), options);
   }
 
-  @SuppressWarnings("unchecked")
   private <T extends BaseMetadata & LocationProvider> VectorContainer 
writeMetadataUsingBatchSchema(List<T> metadataList) {
     Preconditions.checkArgument(!metadataList.isEmpty(), "Metadata list 
shouldn't be empty.");
 
@@ -413,6 +413,7 @@ public class MetadataHandlerBatch extends 
AbstractSingleRecordBatch<MetadataHand
     container.setEmpty();
   }
 
+  @Override
   protected boolean setupNewSchema() {
     setupSchemaFromContainer(incoming.getContainer());
     return true;
@@ -473,6 +474,8 @@ public class MetadataHandlerBatch extends 
AbstractSingleRecordBatch<MetadataHand
           }
           break;
         }
+        default:
+          break;
       }
     }
   }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillStatsTable.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillStatsTable.java
index 9e4e925..70f3e16 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillStatsTable.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillStatsTable.java
@@ -263,6 +263,7 @@ public class DrillStatsTable {
     @JsonProperty ("directories") List<DirectoryStatistics_v0> 
directoryStatistics;
     // Default constructor required for deserializer
     public Statistics_v0 () { }
+    @Override
     @JsonGetter ("directories")
     public List<DirectoryStatistics_v0> getDirectoryStatistics() {
       return directoryStatistics;
@@ -296,6 +297,7 @@ public class DrillStatsTable {
     List<DirectoryStatistics_v1> directoryStatistics;
     // Default constructor required for deserializer
     public Statistics_v1 () { }
+    @Override
     @JsonGetter ("directories")
     public List<DirectoryStatistics_v1> getDirectoryStatistics() {
       return directoryStatistics;
@@ -396,7 +398,7 @@ public class DrillStatsTable {
     }
     @JsonIgnore
     public void buildHistogram(byte[] tdigest_bytearray) {
-      int num_buckets = (int) Math.min(ndv, (long) 
DrillStatsTable.NUM_HISTOGRAM_BUCKETS);
+      int num_buckets = (int) Math.min(ndv, 
DrillStatsTable.NUM_HISTOGRAM_BUCKETS);
       this.histogram = 
HistogramUtils.buildHistogramFromTDigest(tdigest_bytearray, this.getType(),
               num_buckets, nonNullCount);
     }
@@ -479,9 +481,9 @@ public class DrillStatsTable {
    * @param statsProvider the source of statistics
    * @return list of {@link StatisticsKind} and statistics values
    */
-  public static List<StatisticsHolder> getEstimatedTableStats(DrillStatsTable 
statsProvider) {
+  public static List<StatisticsHolder<?>> 
getEstimatedTableStats(DrillStatsTable statsProvider) {
     if (statsProvider != null && statsProvider.isMaterialized()) {
-      List<StatisticsHolder> tableStatistics = Arrays.asList(
+      List<StatisticsHolder<?>> tableStatistics = Arrays.asList(
           new StatisticsHolder<>(statsProvider.getRowCount(), 
TableStatisticsKind.EST_ROW_COUNT),
           new StatisticsHolder<>(Boolean.TRUE, 
TableStatisticsKind.HAS_DESCRIPTIVE_STATISTICS));
       return tableStatistics;
@@ -496,9 +498,9 @@ public class DrillStatsTable {
    * @param fieldName     name of the columns whose statistics should be 
obtained
    * @return list of {@link StatisticsKind} and statistics values
    */
-  public static List<StatisticsHolder> getEstimatedColumnStats(DrillStatsTable 
statsProvider, SchemaPath fieldName) {
+  public static List<StatisticsHolder<?>> 
getEstimatedColumnStats(DrillStatsTable statsProvider, SchemaPath fieldName) {
     if (statsProvider != null && statsProvider.isMaterialized()) {
-      List<StatisticsHolder> statisticsValues = new ArrayList<>();
+      List<StatisticsHolder<?>> statisticsValues = new ArrayList<>();
       Double ndv = statsProvider.getNdv(fieldName);
       if (ndv != null) {
         statisticsValues.add(new StatisticsHolder<>(ndv, 
ColumnStatisticsKind.NDV));
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelMdDistinctRowCount.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelMdDistinctRowCount.java
index 3053297..9f2901b 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelMdDistinctRowCount.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelMdDistinctRowCount.java
@@ -163,7 +163,7 @@ public class DrillRelMdDistinctRowCount extends 
RelMdDistinctRowCount{
       if (!groupKey.get(i)) {
         continue;
       }
-      ColumnStatistics columnStatistics = tableMetadata != null ?
+      ColumnStatistics<?> columnStatistics = tableMetadata != null ?
           tableMetadata.getColumnStatistics(SchemaPath.getSimplePath(colName)) 
: null;
       Double ndv = columnStatistics != null ? 
ColumnStatisticsKind.NDV.getFrom(columnStatistics) : null;
       // Skip NDV, if not available
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelMdSelectivity.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelMdSelectivity.java
index 49d2129..9f3beca 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelMdSelectivity.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelMdSelectivity.java
@@ -187,6 +187,7 @@ public class DrillRelMdSelectivity extends RelMdSelectivity 
{
             try {
               RexVisitor<Void> visitor =
                       new RexVisitorImpl<Void>(true) {
+                        @Override
                         public Void visitCall(RexCall call) {
                           if (call.getKind() != SqlKind.EQUALS) {
                             throw new Util.FoundOne(call);
@@ -294,7 +295,7 @@ public class DrillRelMdSelectivity extends RelMdSelectivity 
{
   private double computeEqualsSelectivity(TableMetadata tableMetadata, RexNode 
orPred, List<SchemaPath> fieldNames) {
     SchemaPath col = getColumn(orPred, fieldNames);
     if (col != null) {
-      ColumnStatistics columnStatistics = tableMetadata != null ? 
tableMetadata.getColumnStatistics(col) : null;
+      ColumnStatistics<?> columnStatistics = tableMetadata != null ? 
tableMetadata.getColumnStatistics(col) : null;
       Double ndv = columnStatistics != null ? 
ColumnStatisticsKind.NDV.getFrom(columnStatistics) : null;
       if (ndv != null) {
         return 1.00 / ndv;
@@ -307,7 +308,7 @@ public class DrillRelMdSelectivity extends RelMdSelectivity 
{
   private double computeRangeSelectivity(TableMetadata tableMetadata, RexNode 
orPred, List<SchemaPath> fieldNames) {
     SchemaPath col = getColumn(orPred, fieldNames);
     if (col != null) {
-      ColumnStatistics columnStatistics = tableMetadata != null ? 
tableMetadata.getColumnStatistics(col) : null;
+      ColumnStatistics<?> columnStatistics = tableMetadata != null ? 
tableMetadata.getColumnStatistics(col) : null;
       Histogram histogram = columnStatistics != null ? 
ColumnStatisticsKind.HISTOGRAM.getFrom(columnStatistics) : null;
       if (histogram != null) {
         Double totalCount = 
ColumnStatisticsKind.ROWCOUNT.getFrom(columnStatistics);
@@ -324,7 +325,7 @@ public class DrillRelMdSelectivity extends RelMdSelectivity 
{
   private double computeIsNotNullSelectivity(TableMetadata tableMetadata, 
RexNode orPred, List<SchemaPath> fieldNames) {
     SchemaPath col = getColumn(orPred, fieldNames);
     if (col != null) {
-      ColumnStatistics columnStatistics = tableMetadata != null ? 
tableMetadata.getColumnStatistics(col) : null;
+      ColumnStatistics<?> columnStatistics = tableMetadata != null ? 
tableMetadata.getColumnStatistics(col) : null;
       Double nonNullCount = columnStatistics != null ? 
ColumnStatisticsKind.NON_NULL_COUNT.getFrom(columnStatistics) : null;
       if (nonNullCount != null) {
         // Cap selectivity below Calcite Guess
@@ -423,6 +424,7 @@ public class DrillRelMdSelectivity extends RelMdSelectivity 
{
     try {
       RexVisitor<Void> visitor =
           new RexVisitorImpl<Void>(true) {
+            @Override
             public Void visitCall(RexCall call) {
               for (RexNode child : call.getOperands()) {
                 child.accept(this);
@@ -430,6 +432,7 @@ public class DrillRelMdSelectivity extends RelMdSelectivity 
{
               return super.visitCall(call);
             }
 
+            @Override
             public Void visitInputRef(RexInputRef inputRef) {
               throw new Util.FoundOne(inputRef);
             }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java
index a9bff98..ac2c3ff 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java
@@ -229,7 +229,7 @@ public abstract class AbstractParquetGroupScan extends 
AbstractGroupScanWithMeta
    * @return group scan with applied filter expression
    */
   @Override
-  public AbstractGroupScanWithMetadata applyFilter(LogicalExpression 
filterExpr, UdfUtilities udfUtilities,
+  public AbstractGroupScanWithMetadata<?> applyFilter(LogicalExpression 
filterExpr, UdfUtilities udfUtilities,
       FunctionImplementationRegistry functionImplementationRegistry, 
OptionManager optionManager) {
     // Builds filter for pruning. If filter cannot be built, null should be 
returned.
     FilterPredicate<?> filterPredicate = getFilterPredicate(filterExpr, 
udfUtilities, functionImplementationRegistry, optionManager, true);
@@ -481,7 +481,7 @@ public abstract class AbstractParquetGroupScan extends 
AbstractGroupScanWithMeta
   protected abstract static class RowGroupScanFilterer<B extends 
RowGroupScanFilterer<B>> extends GroupScanWithMetadataFilterer<B> {
     protected Multimap<Path, RowGroupMetadata> rowGroups = 
LinkedListMultimap.create();
 
-    public RowGroupScanFilterer(AbstractGroupScanWithMetadata source) {
+    public RowGroupScanFilterer(AbstractGroupScanWithMetadata<?> source) {
       super(source);
     }
 
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
index de555c2..74b4c17 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
@@ -102,7 +102,7 @@ public abstract class AbstractParquetScanBatchCreator {
       Path prevRowGroupPath = null;
       Metadata_V4.ParquetTableMetadata_v4 tableMetadataV4 = null;
       Metadata_V4.ParquetFileAndRowCountMetadata fileMetadataV4 = null;
-      FilterPredicate filterPredicate = null;
+      FilterPredicate<?> filterPredicate = null;
       Set<SchemaPath> schemaPathsInExpr = null;
       Set<SchemaPath> columnsInExpr = null;
       // for debug/info logging
@@ -135,7 +135,7 @@ public abstract class AbstractParquetScanBatchCreator {
         Here we could store a map from file names to footers, to prevent 
re-reading the footer for each row group in a file
         TODO - to prevent reading the footer again in the parquet record 
reader (it is read earlier in the ParquetStorageEngine)
         we should add more information to the RowGroupInfo that will be 
populated upon the first read to
-        provide the reader with all of th file meta-data it needs
+        provide the reader with all of the file meta-data it needs
         These fields will be added to the constructor below
         */
 
@@ -190,7 +190,7 @@ public abstract class AbstractParquetScanBatchCreator {
 
             MetadataBase.RowGroupMetadata rowGroupMetadata = 
fileMetadataV4.getFileMetadata().getRowGroups().get(rowGroup.getRowGroupIndex());
 
-            Map<SchemaPath, ColumnStatistics> columnsStatistics = 
ParquetTableMetadataUtils.getRowGroupColumnStatistics(tableMetadataV4, 
rowGroupMetadata);
+            Map<SchemaPath, ColumnStatistics<?>> columnsStatistics = 
ParquetTableMetadataUtils.getRowGroupColumnStatistics(tableMetadataV4, 
rowGroupMetadata);
 
             try {
               Map<SchemaPath, TypeProtos.MajorType> intermediateColumns =
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/BaseParquetMetadataProvider.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/BaseParquetMetadataProvider.java
index ccf5c15..b535e0f 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/BaseParquetMetadataProvider.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/BaseParquetMetadataProvider.java
@@ -207,7 +207,7 @@ public abstract class BaseParquetMetadataProvider 
implements ParquetMetadataProv
   @Override
   public TableMetadata getTableMetadata() {
     if (tableMetadata == null) {
-      List<StatisticsHolder> tableStatistics = new 
ArrayList<>(DrillStatsTable.getEstimatedTableStats(statsTable));
+      List<StatisticsHolder<?>> tableStatistics = new 
ArrayList<>(DrillStatsTable.getEstimatedTableStats(statsTable));
       Map<SchemaPath, TypeProtos.MajorType> fields = 
ParquetTableMetadataUtils.resolveFields(parquetTableMetadata);
       Map<SchemaPath, TypeProtos.MajorType> intermediateFields = 
ParquetTableMetadataUtils.resolveIntermediateFields(parquetTableMetadata);
 
@@ -223,7 +223,7 @@ public abstract class BaseParquetMetadataProvider 
implements ParquetMetadataProv
         });
       }
 
-      Map<SchemaPath, ColumnStatistics> columnsStatistics;
+      Map<SchemaPath, ColumnStatistics<?>> columnsStatistics;
       if (collectMetadata) {
         Collection<? extends BaseMetadata> metadata = 
getFilesMetadataMap().values();
         if (metadata.isEmpty()) {
@@ -243,18 +243,18 @@ public abstract class BaseParquetMetadataProvider 
implements ParquetMetadataProv
         fields.forEach((columnPath, value) -> {
           long columnValueCount = 
getParquetGroupScanStatistics().getColumnValueCount(columnPath);
           // Adds statistics values itself if statistics is available
-          List<StatisticsHolder> stats = new 
ArrayList<>(DrillStatsTable.getEstimatedColumnStats(statsTable, columnPath));
+          List<StatisticsHolder<?>> stats = new 
ArrayList<>(DrillStatsTable.getEstimatedColumnStats(statsTable, columnPath));
           unhandledColumns.remove(columnPath);
 
           // adds statistics for partition columns
           stats.add(new StatisticsHolder<>(columnValueCount, 
TableStatisticsKind.ROW_COUNT));
           stats.add(new 
StatisticsHolder<>(getParquetGroupScanStatistics().getRowCount() - 
columnValueCount, ColumnStatisticsKind.NULLS_COUNT));
-          columnsStatistics.put(columnPath, new ColumnStatistics(stats, 
value.getMinorType()));
+          columnsStatistics.put(columnPath, new ColumnStatistics<>(stats, 
value.getMinorType()));
         });
 
         for (SchemaPath column : unhandledColumns) {
           columnsStatistics.put(column,
-              new 
ColumnStatistics(DrillStatsTable.getEstimatedColumnStats(statsTable, column)));
+              new 
ColumnStatistics<>(DrillStatsTable.getEstimatedColumnStats(statsTable, 
column)));
         }
       }
       MetadataInfo metadataInfo = 
MetadataInfo.builder().type(MetadataType.TABLE).build();
@@ -327,9 +327,9 @@ public abstract class BaseParquetMetadataProvider 
implements ParquetMetadataProv
           partitionPaths.forEach((path, value) -> 
partitionsForValue.put(value, path));
 
           partitionsForValue.asMap().forEach((partitionKey, value) -> {
-            Map<SchemaPath, ColumnStatistics> columnsStatistics = new 
HashMap<>();
+            Map<SchemaPath, ColumnStatistics<?>> columnsStatistics = new 
HashMap<>();
 
-            List<StatisticsHolder> statistics = new ArrayList<>();
+            List<StatisticsHolder<?>> statistics = new ArrayList<>();
             partitionKey = partitionKey == NULL_VALUE ? null : partitionKey;
             statistics.add(new StatisticsHolder<>(partitionKey, 
ColumnStatisticsKind.MIN_VALUE));
             statistics.add(new StatisticsHolder<>(partitionKey, 
ColumnStatisticsKind.MAX_VALUE));
@@ -381,6 +381,7 @@ public abstract class BaseParquetMetadataProvider 
implements ParquetMetadataProv
         .collect(Collectors.toList());
   }
 
+  @SuppressWarnings("unused")
   @Override
   public Map<Path, SegmentMetadata> getSegmentsMetadataMap() {
     if (segments == null) {
@@ -456,7 +457,7 @@ public abstract class BaseParquetMetadataProvider 
implements ParquetMetadataProv
    */
   private static <T extends BaseMetadata & LocationProvider> SegmentMetadata 
combineToSegmentMetadata(Collection<T> metadataList,
       SchemaPath column, Set<Path> metadataLocations) {
-    List<StatisticsHolder> segmentStatistics =
+    List<StatisticsHolder<?>> segmentStatistics =
         Collections.singletonList(
             new StatisticsHolder<>(
                 TableStatisticsKind.ROW_COUNT.mergeStatistics(metadataList),
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FilterEvaluatorUtils.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FilterEvaluatorUtils.java
index 022975e..ffde9c3 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FilterEvaluatorUtils.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FilterEvaluatorUtils.java
@@ -66,7 +66,7 @@ public class FilterEvaluatorUtils {
 
     RowGroupMetadata rowGroupMetadata = new 
ArrayList<>(ParquetTableMetadataUtils.getRowGroupsMetadata(footer).values()).get(rowGroupIndex);
     NonInterestingColumnsMetadata nonInterestingColumnsMetadata = 
ParquetTableMetadataUtils.getNonInterestingColumnsMeta(footer);
-    Map<SchemaPath, ColumnStatistics> columnsStatistics = 
rowGroupMetadata.getColumnsStatistics();
+    Map<SchemaPath, ColumnStatistics<?>> columnsStatistics = 
rowGroupMetadata.getColumnsStatistics();
 
     // Add column statistics of non-interesting columns if there are any
     
columnsStatistics.putAll(nonInterestingColumnsMetadata.getColumnsStatistics());
@@ -78,7 +78,7 @@ public class FilterEvaluatorUtils {
         fragmentContext, fragmentContext.getFunctionRegistry(), new 
HashSet<>(schemaPathsInExpr));
   }
 
-  public static RowsMatch matches(LogicalExpression expr, Map<SchemaPath, 
ColumnStatistics> columnsStatistics, TupleMetadata schema,
+  public static RowsMatch matches(LogicalExpression expr, Map<SchemaPath, 
ColumnStatistics<?>> columnsStatistics, TupleMetadata schema,
                                   long rowCount, UdfUtilities udfUtilities, 
FunctionLookupContext functionImplementationRegistry,
                                   Set<SchemaPath> schemaPathsInExpr) {
     ErrorCollector errorCollector = new ErrorCollectorImpl();
@@ -95,21 +95,22 @@ public class FilterEvaluatorUtils {
     }
 
     Set<LogicalExpression> constantBoundaries = 
ConstantExpressionIdentifier.getConstantExpressionSet(materializedFilter);
-    FilterPredicate parquetPredicate = FilterBuilder.buildFilterPredicate(
+    FilterPredicate<?> parquetPredicate = FilterBuilder.buildFilterPredicate(
         materializedFilter, constantBoundaries, udfUtilities, true);
 
     return matches(parquetPredicate, columnsStatistics, rowCount, schema, 
schemaPathsInExpr);
   }
 
   @SuppressWarnings("unchecked")
-  public static RowsMatch matches(FilterPredicate parquetPredicate,
-                                  Map<SchemaPath, ColumnStatistics> 
columnsStatistics,
+  public static <T extends Comparable<T>> RowsMatch matches(FilterPredicate<T> 
parquetPredicate,
+                                  Map<SchemaPath, ColumnStatistics<?>> 
columnsStatistics,
                                   long rowCount,
                                   TupleMetadata fileMetadata,
                                   Set<SchemaPath> schemaPathsInExpr) {
     RowsMatch rowsMatch = RowsMatch.SOME;
     if (parquetPredicate != null) {
-      StatisticsProvider rangeExprEvaluator = new 
StatisticsProvider(columnsStatistics, rowCount);
+      @SuppressWarnings("rawtypes")
+      StatisticsProvider<T> rangeExprEvaluator = new 
StatisticsProvider(columnsStatistics, rowCount);
       rowsMatch = parquetPredicate.matches(rangeExprEvaluator);
     }
     return rowsMatch == RowsMatch.ALL && isRepeated(schemaPathsInExpr, 
fileMetadata) ? RowsMatch.SOME : rowsMatch;
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
index 051b436..5c0e2e4 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
@@ -65,10 +65,10 @@ public class ParquetGroupScan extends 
AbstractParquetGroupScan {
   private final ParquetFormatPlugin formatPlugin;
   private final ParquetFormatConfig formatConfig;
 
-  private boolean usedMetadataCache; // false by default
+  private final boolean usedMetadataCache; // false by default
   // may change when filter push down / partition pruning is applied
-  private Path selectionRoot;
-  private Path cacheFileRoot;
+  private final Path selectionRoot;
+  private final Path cacheFileRoot;
 
   @SuppressWarnings("unused")
   @JsonCreator
@@ -198,6 +198,7 @@ public class ParquetGroupScan extends 
AbstractParquetGroupScan {
     return formatPlugin.getStorageConfig();
   }
 
+  @Override
   @JsonProperty
   public Path getSelectionRoot() {
     return selectionRoot;
@@ -288,7 +289,7 @@ public class ParquetGroupScan extends 
AbstractParquetGroupScan {
   }
 
   @Override
-  protected RowGroupScanFilterer getFilterer() {
+  protected RowGroupScanFilterer<?> getFilterer() {
     return new ParquetGroupScanFilterer(this);
   }
 
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScanStatistics.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScanStatistics.java
index 17a7c55..f15409d 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScanStatistics.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScanStatistics.java
@@ -59,8 +59,7 @@ public class ParquetGroupScanStatistics<T extends 
BaseMetadata & LocationProvide
     collect(rowGroupInfos);
   }
 
-  @SuppressWarnings("unchecked")
-  public ParquetGroupScanStatistics(ParquetGroupScanStatistics that) {
+  public ParquetGroupScanStatistics(ParquetGroupScanStatistics<T> that) {
     this.partitionValueMap = HashBasedTable.create(that.partitionValueMap);
     this.partitionColTypeMap = new HashMap<>(that.partitionColTypeMap);
     this.columnValueCounts = new HashMap<>(that.columnValueCounts);
@@ -101,9 +100,9 @@ public class ParquetGroupScanStatistics<T extends 
BaseMetadata & LocationProvide
     boolean first = true;
     for (T metadata : metadataList) {
       long localRowCount = TableStatisticsKind.ROW_COUNT.getValue(metadata);
-      for (Map.Entry<SchemaPath, ColumnStatistics> columnsStatistics : 
metadata.getColumnsStatistics().entrySet()) {
+      for (Map.Entry<SchemaPath, ColumnStatistics<?>> columnsStatistics : 
metadata.getColumnsStatistics().entrySet()) {
         SchemaPath schemaPath = columnsStatistics.getKey();
-        ColumnStatistics statistics = columnsStatistics.getValue();
+        ColumnStatistics<?> statistics = columnsStatistics.getValue();
         MutableLong emptyCount = new MutableLong();
         MutableLong previousCount = columnValueCounts.putIfAbsent(schemaPath, 
emptyCount);
         if (previousCount == null) {
@@ -164,7 +163,7 @@ public class ParquetGroupScanStatistics<T extends 
BaseMetadata & LocationProvide
    * @param rowCount         row count
    * @return whether column is a potential partition column
    */
-  private boolean checkForPartitionColumn(ColumnStatistics columnStatistics,
+  private boolean checkForPartitionColumn(ColumnStatistics<?> columnStatistics,
                                           boolean first,
                                           long rowCount,
                                           TypeProtos.MajorType type,
@@ -202,11 +201,11 @@ public class ParquetGroupScanStatistics<T extends 
BaseMetadata & LocationProvide
    * @param rowCount         rows count in column chunk
    * @return true if column has single value
    */
-  private boolean hasSingleValue(ColumnStatistics columnStatistics, long 
rowCount) {
+  private boolean hasSingleValue(ColumnStatistics<?> columnStatistics, long 
rowCount) {
     return columnStatistics != null && isSingleVal(columnStatistics, rowCount);
   }
 
-  private boolean isSingleVal(ColumnStatistics columnStatistics, long 
rowCount) {
+  private boolean isSingleVal(ColumnStatistics<?> columnStatistics, long 
rowCount) {
     Long numNulls = ColumnStatisticsKind.NULLS_COUNT.getFrom(columnStatistics);
     if (numNulls != null && numNulls != Statistic.NO_COLUMN_STATS) {
       Object min = columnStatistics.get(ColumnStatisticsKind.MIN_VALUE);
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetPushDownFilter.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetPushDownFilter.java
index bd2e119..219359b 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetPushDownFilter.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetPushDownFilter.java
@@ -138,7 +138,7 @@ public abstract class ParquetPushDownFilter extends 
StoragePluginOptimizerRule {
 
     final List<RexNode> qualifiedPredList = new ArrayList<>();
 
-    // list of predicates which cannot be converted to parquet filter predicate
+    // list of predicates which cannot be converted to Parquet filter predicate
     List<RexNode> nonConvertedPredList = new ArrayList<>();
 
     for (RexNode pred : predList) {
@@ -147,7 +147,7 @@ public abstract class ParquetPushDownFilter extends 
StoragePluginOptimizerRule {
             new 
DrillParseContext(PrelUtil.getPlannerSettings(call.getPlanner())), scan, pred);
 
         // checks whether predicate may be used for filter pushdown
-        FilterPredicate parquetFilterPredicate =
+        FilterPredicate<?> parquetFilterPredicate =
             groupScan.getFilterPredicate(drillPredicate,
                 optimizerContext,
                 optimizerContext.getFunctionRegistry(), 
optimizerContext.getPlannerSettings().getOptions(), false);
@@ -171,11 +171,11 @@ public abstract class ParquetPushDownFilter extends 
StoragePluginOptimizerRule {
     LogicalExpression conditionExp = DrillOptiq.toDrill(
         new DrillParseContext(PrelUtil.getPlannerSettings(call.getPlanner())), 
scan, qualifiedPred);
 
-    // Default - pass the original filter expr to (potentialy) be used at 
run-time
+    // Default - pass the original filter expr to (potentially) be used at 
run-time
     groupScan.setFilterForRuntime(conditionExp, optimizerContext); // later 
may remove or set to another filter (see below)
 
     Stopwatch timer = logger.isDebugEnabled() ? Stopwatch.createStarted() : 
null;
-    AbstractGroupScanWithMetadata newGroupScan = 
groupScan.applyFilter(conditionExp, optimizerContext,
+    AbstractGroupScanWithMetadata<?> newGroupScan = 
groupScan.applyFilter(conditionExp, optimizerContext,
         optimizerContext.getFunctionRegistry(), 
optimizerContext.getPlannerSettings().getOptions());
     if (timer != null) {
       logger.debug("Took {} ms to apply filter on parquet row groups. ", 
timer.elapsed(TimeUnit.MILLISECONDS));
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetTableMetadataUtils.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetTableMetadataUtils.java
index 718f3c0..68254d3 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetTableMetadataUtils.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetTableMetadataUtils.java
@@ -80,7 +80,7 @@ import java.util.stream.Collectors;
 @SuppressWarnings("WeakerAccess")
 public class ParquetTableMetadataUtils {
 
-  static final List<CollectableColumnStatisticsKind> PARQUET_COLUMN_STATISTICS 
=
+  static final List<CollectableColumnStatisticsKind<?>> 
PARQUET_COLUMN_STATISTICS =
           ImmutableList.of(
               ColumnStatisticsKind.MAX_VALUE,
               ColumnStatisticsKind.MIN_VALUE,
@@ -102,8 +102,8 @@ public class ParquetTableMetadataUtils {
    * @param supportsFileImplicitColumns whether implicit columns are supported
    * @return map with added statistics for implicit and partition (dir) columns
    */
-  public static Map<SchemaPath, ColumnStatistics> addImplicitColumnsStatistics(
-      Map<SchemaPath, ColumnStatistics> columnsStatistics, List<SchemaPath> 
columns,
+  public static Map<SchemaPath, ColumnStatistics<?>> 
addImplicitColumnsStatistics(
+      Map<SchemaPath, ColumnStatistics<?>> columnsStatistics, List<SchemaPath> 
columns,
       List<String> partitionValues, OptionManager optionManager, Path 
location, boolean supportsFileImplicitColumns) {
     ColumnExplorer columnExplorer = new ColumnExplorer(optionManager, columns);
 
@@ -157,11 +157,11 @@ public class ParquetTableMetadataUtils {
    */
   public static RowGroupMetadata 
getRowGroupMetadata(MetadataBase.ParquetTableMetadataBase tableMetadata,
       MetadataBase.RowGroupMetadata rowGroupMetadata, int rgIndexInFile, Path 
location) {
-    Map<SchemaPath, ColumnStatistics> columnsStatistics = 
getRowGroupColumnStatistics(tableMetadata, rowGroupMetadata);
-    List<StatisticsHolder> rowGroupStatistics = new ArrayList<>();
+    Map<SchemaPath, ColumnStatistics<?>> columnsStatistics = 
getRowGroupColumnStatistics(tableMetadata, rowGroupMetadata);
+    List<StatisticsHolder<?>> rowGroupStatistics = new ArrayList<>();
     rowGroupStatistics.add(new 
StatisticsHolder<>(rowGroupMetadata.getRowCount(), 
TableStatisticsKind.ROW_COUNT));
-    rowGroupStatistics.add(new StatisticsHolder<>(rowGroupMetadata.getStart(), 
new BaseStatisticsKind(ExactStatisticsConstants.START, true)));
-    rowGroupStatistics.add(new 
StatisticsHolder<>(rowGroupMetadata.getLength(), new 
BaseStatisticsKind(ExactStatisticsConstants.LENGTH, true)));
+    rowGroupStatistics.add(new StatisticsHolder<>(rowGroupMetadata.getStart(), 
new BaseStatisticsKind<>(ExactStatisticsConstants.START, true)));
+    rowGroupStatistics.add(new 
StatisticsHolder<>(rowGroupMetadata.getLength(), new 
BaseStatisticsKind<>(ExactStatisticsConstants.LENGTH, true)));
 
     Map<SchemaPath, TypeProtos.MajorType> columns = 
getRowGroupFields(tableMetadata, rowGroupMetadata);
     Map<SchemaPath, TypeProtos.MajorType> intermediateColumns = 
getIntermediateFields(tableMetadata, rowGroupMetadata);
@@ -195,7 +195,7 @@ public class ParquetTableMetadataUtils {
     if (rowGroups.isEmpty()) {
       return null;
     }
-    List<StatisticsHolder> fileStatistics = new ArrayList<>();
+    List<StatisticsHolder<?>> fileStatistics = new ArrayList<>();
     fileStatistics.add(new 
StatisticsHolder<>(TableStatisticsKind.ROW_COUNT.mergeStatistics(rowGroups), 
TableStatisticsKind.ROW_COUNT));
 
     RowGroupMetadata rowGroupMetadata = rowGroups.iterator().next();
@@ -255,10 +255,10 @@ public class ParquetTableMetadataUtils {
    * @param rowGroupMetadata metadata to convert
    * @return map with converted row group metadata
    */
-  public static Map<SchemaPath, ColumnStatistics> getRowGroupColumnStatistics(
+  public static Map<SchemaPath, ColumnStatistics<?>> 
getRowGroupColumnStatistics(
       MetadataBase.ParquetTableMetadataBase tableMetadata, 
MetadataBase.RowGroupMetadata rowGroupMetadata) {
 
-    Map<SchemaPath, ColumnStatistics> columnsStatistics = new HashMap<>();
+    Map<SchemaPath, ColumnStatistics<?>> columnsStatistics = new HashMap<>();
 
     for (MetadataBase.ColumnMetadata column : rowGroupMetadata.getColumns()) {
       SchemaPath colPath = SchemaPath.getCompoundPath(column.getName());
@@ -271,7 +271,7 @@ public class ParquetTableMetadataUtils {
       OriginalType originalType = getOriginalType(tableMetadata, column);
       TypeProtos.MinorType type = 
ParquetReaderUtility.getMinorType(primitiveType, originalType);
 
-      List<StatisticsHolder> statistics = new ArrayList<>();
+      List<StatisticsHolder<?>> statistics = new ArrayList<>();
       statistics.add(new StatisticsHolder<>(getValue(column.getMinValue(), 
primitiveType, originalType), ColumnStatisticsKind.MIN_VALUE));
       statistics.add(new StatisticsHolder<>(getValue(column.getMaxValue(), 
primitiveType, originalType), ColumnStatisticsKind.MAX_VALUE));
       statistics.add(new StatisticsHolder<>(nulls, 
ColumnStatisticsKind.NULLS_COUNT));
@@ -286,7 +286,7 @@ public class ParquetTableMetadataUtils {
    * @return returns non-interesting columns metadata
    */
   public static NonInterestingColumnsMetadata 
getNonInterestingColumnsMeta(MetadataBase.ParquetTableMetadataBase 
parquetTableMetadata) {
-    Map<SchemaPath, ColumnStatistics> columnsStatistics = new HashMap<>();
+    Map<SchemaPath, ColumnStatistics<?>> columnsStatistics = new HashMap<>();
     if (parquetTableMetadata instanceof Metadata_V4.ParquetTableMetadata_v4) {
       Map<Metadata_V4.ColumnTypeMetadata_v4.Key, 
Metadata_V4.ColumnTypeMetadata_v4> columnTypeInfoMap =
               ((Metadata_V4.ParquetTableMetadata_v4) 
parquetTableMetadata).getColumnTypeInfoMap();
@@ -298,7 +298,7 @@ public class ParquetTableMetadataUtils {
       for (Metadata_V4.ColumnTypeMetadata_v4 columnTypeMetadata : 
columnTypeInfoMap.values()) {
         if (!columnTypeMetadata.isInteresting) {
           SchemaPath schemaPath = 
SchemaPath.getCompoundPath(columnTypeMetadata.name);
-          List<StatisticsHolder> statistics = new ArrayList<>();
+          List<StatisticsHolder<?>> statistics = new ArrayList<>();
           statistics.add(new StatisticsHolder<>(Statistic.NO_COLUMN_STATS, 
ColumnStatisticsKind.NULLS_COUNT));
           PrimitiveType.PrimitiveTypeName primitiveType = 
columnTypeMetadata.primitiveType;
           OriginalType originalType = columnTypeMetadata.originalType;
@@ -371,7 +371,8 @@ public class ParquetTableMetadataUtils {
     } else if (value instanceof String) { // value is obtained from metadata 
cache v2+
       return ((String) value).getBytes();
     } else if (value instanceof Map) { // value is obtained from metadata 
cache v1
-      String bytesString = (String) ((Map) value).get("bytes");
+      @SuppressWarnings("unchecked")
+      String bytesString = ((Map<String,String>) value).get("bytes");
       if (bytesString != null) {
         return bytesString.getBytes();
       }
@@ -647,7 +648,7 @@ public class ParquetTableMetadataUtils {
    * @param statistics source of column statistics
    * @return map with schema path and {@link ColumnStatistics}
    */
-  public static Map<SchemaPath, ColumnStatistics> 
getColumnStatistics(TupleMetadata schema, DrillStatsTable statistics) {
+  public static Map<SchemaPath, ColumnStatistics<?>> 
getColumnStatistics(TupleMetadata schema, DrillStatsTable statistics) {
     List<SchemaPath> schemaPaths = SchemaUtil.getSchemaPaths(schema);
 
     return schemaPaths.stream()
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestInfoSchemaWithMetastore.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestInfoSchemaWithMetastore.java
index 6ce32e4..c89472b 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestInfoSchemaWithMetastore.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestInfoSchemaWithMetastore.java
@@ -209,21 +209,21 @@ public class TestInfoSchemaWithMetastore extends 
ClusterTest {
     schema.addColumn(varcharCol);
     schema.addColumn(timestampColumn);
 
-    Map<SchemaPath, ColumnStatistics> columnsStatistics = new HashMap<>();
+    Map<SchemaPath, ColumnStatistics<?>> columnsStatistics = new HashMap<>();
     columnsStatistics.put(SchemaPath.parseFromString("varchar_col"),
-      new ColumnStatistics(Arrays.asList(
+      new ColumnStatistics<>(Arrays.asList(
         new StatisticsHolder<>("aaa", ColumnStatisticsKind.MIN_VALUE),
         new StatisticsHolder<>("zzz", ColumnStatisticsKind.MAX_VALUE))));
     
columnsStatistics.put(SchemaPath.parseFromString("struct_col.nested_struct.nested_struct_varchar"),
-      new ColumnStatistics(Arrays.asList(
+      new ColumnStatistics<>(Arrays.asList(
         new StatisticsHolder<>("bbb", ColumnStatisticsKind.MIN_VALUE),
         new StatisticsHolder<>("ccc", ColumnStatisticsKind.MAX_VALUE))));
     columnsStatistics.put(SchemaPath.parseFromString("bigint_col"),
-      new ColumnStatistics(Arrays.asList(
+      new ColumnStatistics<>(Arrays.asList(
         new StatisticsHolder<>(100L, ColumnStatisticsKind.NULLS_COUNT),
         new StatisticsHolder<>(10.5D, ColumnStatisticsKind.NDV))));
     
columnsStatistics.put(SchemaPath.parseFromString("struct_col.struct_bigint"),
-      new ColumnStatistics(Collections.singletonList(
+      new ColumnStatistics<>(Collections.singletonList(
         new StatisticsHolder<>(10.5D, ColumnStatisticsKind.NON_NULL_COUNT))));
 
     ZonedDateTime currentTime = currentUtcTime();
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestMetastoreCommands.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestMetastoreCommands.java
index ff2108b..b8bf889 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestMetastoreCommands.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestMetastoreCommands.java
@@ -81,7 +81,6 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 @Category({SlowTest.class, MetastoreTest.class})
-@SuppressWarnings({"rawtypes", "unchecked"})
 public class TestMetastoreCommands extends ClusterTest {
 
   private static final TupleMetadata SCHEMA = new SchemaBuilder()
@@ -98,7 +97,8 @@ public class TestMetastoreCommands extends ClusterTest {
       .add("o_comment", TypeProtos.MinorType.VARCHAR)
       .build();
 
-  private static final Map<SchemaPath, ColumnStatistics> 
TABLE_COLUMN_STATISTICS = ImmutableMap.<SchemaPath, ColumnStatistics>builder()
+  private static final Map<SchemaPath, ColumnStatistics<?>> 
TABLE_COLUMN_STATISTICS =
+      ImmutableMap.<SchemaPath, ColumnStatistics<?>>builder()
       .put(SchemaPath.getSimplePath("o_shippriority"),
           getColumnStatistics(0, 0, 120L, TypeProtos.MinorType.INT))
       .put(SchemaPath.getSimplePath("o_orderstatus"),
@@ -124,7 +124,8 @@ public class TestMetastoreCommands extends ClusterTest {
           getColumnStatistics(757382400000L, 850953600000L, 120L, 
TypeProtos.MinorType.DATE))
       .build();
 
-  private static final Map<SchemaPath, ColumnStatistics> 
DIR0_1994_SEGMENT_COLUMN_STATISTICS = ImmutableMap.<SchemaPath, 
ColumnStatistics>builder()
+  private static final Map<SchemaPath, ColumnStatistics<?>> 
DIR0_1994_SEGMENT_COLUMN_STATISTICS =
+      ImmutableMap.<SchemaPath, ColumnStatistics<?>>builder()
       .put(SchemaPath.getSimplePath("o_shippriority"),
           getColumnStatistics(0, 0, 40L, TypeProtos.MinorType.INT))
       .put(SchemaPath.getSimplePath("o_orderstatus"),
@@ -150,7 +151,8 @@ public class TestMetastoreCommands extends ClusterTest {
           getColumnStatistics(757382400000L, 788140800000L, 40L, 
TypeProtos.MinorType.DATE))
       .build();
 
-  private static final Map<SchemaPath, ColumnStatistics> 
DIR0_1994_Q1_SEGMENT_COLUMN_STATISTICS = ImmutableMap.<SchemaPath, 
ColumnStatistics>builder()
+  private static final Map<SchemaPath, ColumnStatistics<?>> 
DIR0_1994_Q1_SEGMENT_COLUMN_STATISTICS =
+      ImmutableMap.<SchemaPath, ColumnStatistics<?>>builder()
       .put(SchemaPath.getSimplePath("o_shippriority"),
           getColumnStatistics(0, 0, 10L, TypeProtos.MinorType.INT))
       .put(SchemaPath.getSimplePath("o_orderstatus"),
@@ -594,7 +596,7 @@ public class TestMetastoreCommands extends ClusterTest {
 
     TableInfo tableInfo = getTableInfo(tableName, "tmp");
 
-    Map<SchemaPath, ColumnStatistics> updatedTableColumnStatistics = new 
HashMap<>();
+    Map<SchemaPath, ColumnStatistics<?>> updatedTableColumnStatistics = new 
HashMap<>();
 
     SchemaPath orderStatusPath = SchemaPath.getSimplePath("o_orderstatus");
     SchemaPath dir0Path = SchemaPath.getSimplePath("dir0");
@@ -646,7 +648,7 @@ public class TestMetastoreCommands extends ClusterTest {
 
     TableInfo tableInfo = getTableInfo(tableName, "tmp");
 
-    Map<SchemaPath, ColumnStatistics> updatedTableColumnStatistics = new 
HashMap<>();
+    Map<SchemaPath, ColumnStatistics<?>> updatedTableColumnStatistics = new 
HashMap<>();
 
     SchemaPath dir0Path = SchemaPath.getSimplePath("dir0");
     SchemaPath dir1Path = SchemaPath.getSimplePath("dir1");
@@ -696,7 +698,7 @@ public class TestMetastoreCommands extends ClusterTest {
 
     TableInfo tableInfo = getTableInfo(tableName, "tmp");
 
-    Map<SchemaPath, ColumnStatistics> updatedTableColumnStatistics = new 
HashMap<>();
+    Map<SchemaPath, ColumnStatistics<?>> updatedTableColumnStatistics = new 
HashMap<>();
 
     SchemaPath orderStatusPath = SchemaPath.getSimplePath("o_orderstatus");
     SchemaPath orderDatePath = SchemaPath.getSimplePath("o_orderdate");
@@ -767,7 +769,7 @@ public class TestMetastoreCommands extends ClusterTest {
 
     TableInfo tableInfo = getTableInfo(tableName, "tmp");
 
-    Map<SchemaPath, ColumnStatistics> updatedTableColumnStatistics = new 
HashMap<>();
+    Map<SchemaPath, ColumnStatistics<?>> updatedTableColumnStatistics = new 
HashMap<>();
 
     SchemaPath orderStatusPath = SchemaPath.getSimplePath("o_orderstatus");
     SchemaPath orderDatePath = SchemaPath.getSimplePath("o_orderdate");
@@ -844,7 +846,7 @@ public class TestMetastoreCommands extends ClusterTest {
 
     TableInfo tableInfo = getTableInfo(tableName, "tmp");
 
-    Map<SchemaPath, ColumnStatistics> updatedTableColumnStatistics = new 
HashMap<>();
+    Map<SchemaPath, ColumnStatistics<?>> updatedTableColumnStatistics = new 
HashMap<>();
 
     SchemaPath orderStatusPath = SchemaPath.getSimplePath("o_orderstatus");
     SchemaPath orderDatePath = SchemaPath.getSimplePath("o_orderdate");
@@ -977,7 +979,7 @@ public class TestMetastoreCommands extends ClusterTest {
     TableInfo tableInfo = getTableInfo(tableName, "tmp");
 
     // updates statistics values due to new segment
-    Map<SchemaPath, ColumnStatistics> updatedStatistics = new 
HashMap<>(TABLE_COLUMN_STATISTICS);
+    Map<SchemaPath, ColumnStatistics<?>> updatedStatistics = new 
HashMap<>(TABLE_COLUMN_STATISTICS);
     updatedStatistics.replaceAll((logicalExpressions, columnStatistics) ->
         columnStatistics.cloneWith(new ColumnStatistics<>(
             Arrays.asList(
@@ -1068,15 +1070,15 @@ public class TestMetastoreCommands extends ClusterTest {
     TableInfo tableInfo = getTableInfo(tableName, "tmp");
 
     // updates statistics values due to new segment
-    Map<SchemaPath, ColumnStatistics> updatedStatistics = new 
HashMap<>(TABLE_COLUMN_STATISTICS);
+    Map<SchemaPath, ColumnStatistics<?>> updatedStatistics = new 
HashMap<>(TABLE_COLUMN_STATISTICS);
     updatedStatistics.replaceAll((logicalExpressions, columnStatistics) ->
-        columnStatistics.cloneWith(new ColumnStatistics(
+        columnStatistics.cloneWith(new ColumnStatistics<>(
             Arrays.asList(
                 new StatisticsHolder<>(130L, TableStatisticsKind.ROW_COUNT),
                 new StatisticsHolder<>(130L, 
ColumnStatisticsKind.NON_NULL_VALUES_COUNT)))));
 
     updatedStatistics.computeIfPresent(SchemaPath.getSimplePath("dir1"), 
(logicalExpressions, columnStatistics) ->
-        columnStatistics.cloneWith(new ColumnStatistics(
+        columnStatistics.cloneWith(new ColumnStatistics<>(
             Collections.singletonList(new StatisticsHolder<>("Q5", 
ColumnStatisticsKind.MAX_VALUE)))));
 
     BaseTableMetadata expectedTableMetadata = BaseTableMetadata.builder()
@@ -1152,9 +1154,9 @@ public class TestMetastoreCommands extends ClusterTest {
     TableInfo tableInfo = getTableInfo(tableName, "tmp");
 
     // updates statistics values due to new segment
-    Map<SchemaPath, ColumnStatistics> updatedStatistics = new 
HashMap<>(TABLE_COLUMN_STATISTICS);
+    Map<SchemaPath, ColumnStatistics<?>> updatedStatistics = new 
HashMap<>(TABLE_COLUMN_STATISTICS);
     updatedStatistics.replaceAll((logicalExpressions, columnStatistics) ->
-        columnStatistics.cloneWith(new ColumnStatistics(
+        columnStatistics.cloneWith(new ColumnStatistics<>(
             Arrays.asList(
                 new StatisticsHolder<>(130L, TableStatisticsKind.ROW_COUNT),
                 new StatisticsHolder<>(130L, 
ColumnStatisticsKind.NON_NULL_VALUES_COUNT)))));
@@ -1625,15 +1627,15 @@ public class TestMetastoreCommands extends ClusterTest {
           .basicRequests()
           .tableMetadata(tableInfo);
 
-      Map<SchemaPath, ColumnStatistics> tableColumnStatistics = new 
HashMap<>(TABLE_COLUMN_STATISTICS);
+      Map<SchemaPath, ColumnStatistics<?>> tableColumnStatistics = new 
HashMap<>(TABLE_COLUMN_STATISTICS);
       
tableColumnStatistics.computeIfPresent(SchemaPath.getSimplePath("o_clerk"),
           (logicalExpressions, columnStatistics) ->
-              columnStatistics.cloneWith(new ColumnStatistics(
+              columnStatistics.cloneWith(new ColumnStatistics<>(
                   Collections.singletonList(new 
StatisticsHolder<>("Clerk#000000006", ColumnStatisticsKind.MIN_VALUE)))));
 
       
tableColumnStatistics.computeIfPresent(SchemaPath.getSimplePath("o_totalprice"),
           (logicalExpressions, columnStatistics) ->
-              columnStatistics.cloneWith(new ColumnStatistics(
+              columnStatistics.cloneWith(new ColumnStatistics<>(
                   Collections.singletonList(new StatisticsHolder<>(328207.15, 
ColumnStatisticsKind.MAX_VALUE)))));
 
       BaseTableMetadata expectedTableMetadata = BaseTableMetadata.builder()
@@ -1779,7 +1781,7 @@ public class TestMetastoreCommands extends ClusterTest {
 
     TableInfo tableInfo = getTableInfo(tableName, "tmp");
 
-    Map<SchemaPath, ColumnStatistics> tableColumnStatistics = new 
HashMap<>(TABLE_COLUMN_STATISTICS);
+    Map<SchemaPath, ColumnStatistics<?>> tableColumnStatistics = new 
HashMap<>(TABLE_COLUMN_STATISTICS);
     tableColumnStatistics.remove(SchemaPath.getSimplePath("dir0"));
     tableColumnStatistics.remove(SchemaPath.getSimplePath("dir1"));
 
@@ -1800,7 +1802,7 @@ public class TestMetastoreCommands extends ClusterTest {
             getColumnStatistics(757382400000L, 764640000000L, 120L, 
TypeProtos.MinorType.DATE));
 
     tableColumnStatistics.replaceAll((logicalExpressions, columnStatistics) ->
-        columnStatistics.cloneWith(new ColumnStatistics(
+        columnStatistics.cloneWith(new ColumnStatistics<>(
             Arrays.asList(
                 new StatisticsHolder<>(10L, TableStatisticsKind.ROW_COUNT),
                 new StatisticsHolder<>(10L, 
ColumnStatisticsKind.NON_NULL_VALUES_COUNT)))));
@@ -1905,7 +1907,7 @@ public class TestMetastoreCommands extends ClusterTest {
             .resumeSchema()
         .build();
 
-    Map<SchemaPath, ColumnStatistics> columnStatistics = 
ImmutableMap.<SchemaPath, ColumnStatistics>builder()
+    Map<SchemaPath, ColumnStatistics<?>> columnStatistics = 
ImmutableMap.<SchemaPath, ColumnStatistics<?>>builder()
         .put(SchemaPath.getCompoundPath("user_info", "state"),
             getColumnStatistics("ct", "nj", 5L, TypeProtos.MinorType.VARCHAR))
         .put(SchemaPath.getSimplePath("date"),
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestFileGenerator.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestFileGenerator.java
index f198d3c..c44ef23 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestFileGenerator.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestFileGenerator.java
@@ -105,35 +105,6 @@ public class TestFileGenerator {
     props.fields.put("S_COMMENT", new FieldInfo("binary", "bin2", -1, 
bin2Vals, TypeProtos.MinorType.VARBINARY, props));
   }
 
-  private static abstract class ValueProducer {
-
-    public abstract void reset();
-    public abstract Object getValue();
-  }
-
-  private static class ValueRepeaterProducer extends ValueProducer {
-
-    WrapAroundCounter position;
-    Object[] values;
-
-    public ValueRepeaterProducer(Object[] values) {
-      this.values = values;
-      position = new WrapAroundCounter(values.length);
-    }
-
-    @Override
-    public void reset() {
-      position.reset();
-    }
-
-    @Override
-    public Object getValue() {
-      Object ret = values[position.val];
-      position.increment();
-      return ret;
-    }
-  }
-
   public static void generateParquetFile(String filename, 
ParquetTestProperties props) throws Exception {
 
     int currentBooleanByte = 0;
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java
index 86a7bf9..e51e311 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java
@@ -608,13 +608,13 @@ public class TestParquetFilterPushDown extends 
PlanTestBase {
     
Mockito.when(booleanStatistics.get(ColumnStatisticsKind.MIN_VALUE)).thenReturn(false);
 // min false
     
Mockito.when(booleanStatistics.get(ColumnStatisticsKind.MAX_VALUE)).thenReturn(true);
 // max true
     
Mockito.when(booleanStatistics.getValueComparator()).thenReturn(Comparator.nullsFirst(Comparator.naturalOrder()));
 // comparator
-    IsPredicate isTrue = (IsPredicate) 
IsPredicate.createIsPredicate(FunctionGenerationHelper.IS_TRUE, le);
+    IsPredicate<Boolean> isTrue = (IsPredicate<Boolean>) 
IsPredicate.createIsPredicate(FunctionGenerationHelper.IS_TRUE, le);
     assertEquals(RowsMatch.SOME, isTrue.matches(re));
-    IsPredicate isFalse = (IsPredicate)  
IsPredicate.createIsPredicate(FunctionGenerationHelper.IS_FALSE, le);
+    IsPredicate<Boolean> isFalse = (IsPredicate<Boolean>) 
IsPredicate.createIsPredicate(FunctionGenerationHelper.IS_FALSE, le);
     assertEquals(RowsMatch.SOME, isFalse.matches(re));
-    IsPredicate isNotTrue = (IsPredicate)  
IsPredicate.createIsPredicate(FunctionGenerationHelper.IS_NOT_TRUE, le);
+    IsPredicate<Boolean> isNotTrue = (IsPredicate<Boolean>) 
IsPredicate.createIsPredicate(FunctionGenerationHelper.IS_NOT_TRUE, le);
     assertEquals(RowsMatch.SOME, isNotTrue.matches(re));
-    IsPredicate isNotFalse = (IsPredicate)  
IsPredicate.createIsPredicate(FunctionGenerationHelper.IS_NOT_FALSE, le);
+    IsPredicate<Boolean> isNotFalse = (IsPredicate<Boolean>) 
IsPredicate.createIsPredicate(FunctionGenerationHelper.IS_NOT_FALSE, le);
     assertEquals(RowsMatch.SOME, isNotFalse.matches(re));
   }
 
@@ -631,13 +631,13 @@ public class TestParquetFilterPushDown extends 
PlanTestBase {
     
Mockito.when(booleanStatistics.get(ColumnStatisticsKind.MIN_VALUE)).thenReturn(false);
 // min false
     
Mockito.when(booleanStatistics.get(ColumnStatisticsKind.MAX_VALUE)).thenReturn(false);
 // max false
     
Mockito.when(booleanStatistics.getValueComparator()).thenReturn(Comparator.nullsFirst(Comparator.naturalOrder()));
 // comparator
-    IsPredicate isTrue = (IsPredicate) 
IsPredicate.createIsPredicate(FunctionGenerationHelper.IS_TRUE, le);
+    IsPredicate<Boolean> isTrue = (IsPredicate<Boolean>) 
IsPredicate.createIsPredicate(FunctionGenerationHelper.IS_TRUE, le);
     assertEquals(RowsMatch.NONE, isTrue.matches(re));
-    IsPredicate isFalse = (IsPredicate)  
IsPredicate.createIsPredicate(FunctionGenerationHelper.IS_FALSE, le);
+    IsPredicate<Boolean> isFalse = (IsPredicate<Boolean>) 
IsPredicate.createIsPredicate(FunctionGenerationHelper.IS_FALSE, le);
     assertEquals(RowsMatch.ALL, isFalse.matches(re));
-    IsPredicate isNotTrue = (IsPredicate)  
IsPredicate.createIsPredicate(FunctionGenerationHelper.IS_NOT_TRUE, le);
+    IsPredicate<Boolean> isNotTrue = (IsPredicate<Boolean>) 
IsPredicate.createIsPredicate(FunctionGenerationHelper.IS_NOT_TRUE, le);
     assertEquals(RowsMatch.ALL, isNotTrue.matches(re));
-    IsPredicate isNotFalse = (IsPredicate)  
IsPredicate.createIsPredicate(FunctionGenerationHelper.IS_NOT_FALSE, le);
+    IsPredicate<Boolean> isNotFalse = (IsPredicate<Boolean>) 
IsPredicate.createIsPredicate(FunctionGenerationHelper.IS_NOT_FALSE, le);
     assertEquals(RowsMatch.NONE, isNotFalse.matches(re));
   }
 
@@ -653,13 +653,13 @@ public class TestParquetFilterPushDown extends 
PlanTestBase {
     
Mockito.when(booleanStatistics.get(ColumnStatisticsKind.NULLS_COUNT)).thenReturn(0L);
 // no nulls
     
Mockito.when(booleanStatistics.get(ColumnStatisticsKind.MIN_VALUE)).thenReturn(true);
 // min false
     
Mockito.when(booleanStatistics.get(ColumnStatisticsKind.MAX_VALUE)).thenReturn(true);
 // max false
-    IsPredicate isTrue = (IsPredicate) 
IsPredicate.createIsPredicate(FunctionGenerationHelper.IS_TRUE, le);
+    IsPredicate<Boolean> isTrue = (IsPredicate<Boolean>) 
IsPredicate.createIsPredicate(FunctionGenerationHelper.IS_TRUE, le);
     assertEquals(RowsMatch.ALL, isTrue.matches(re));
-    IsPredicate isFalse = (IsPredicate)  
IsPredicate.createIsPredicate(FunctionGenerationHelper.IS_FALSE, le);
+    IsPredicate<Boolean> isFalse = (IsPredicate<Boolean>) 
IsPredicate.createIsPredicate(FunctionGenerationHelper.IS_FALSE, le);
     assertEquals(RowsMatch.NONE, isFalse.matches(re));
-    IsPredicate isNotTrue = (IsPredicate)  
IsPredicate.createIsPredicate(FunctionGenerationHelper.IS_NOT_TRUE, le);
+    IsPredicate<Boolean> isNotTrue = (IsPredicate<Boolean>) 
IsPredicate.createIsPredicate(FunctionGenerationHelper.IS_NOT_TRUE, le);
     assertEquals(RowsMatch.NONE, isNotTrue.matches(re));
-    IsPredicate isNotFalse = (IsPredicate)  
IsPredicate.createIsPredicate(FunctionGenerationHelper.IS_NOT_FALSE, le);
+    IsPredicate<Boolean> isNotFalse = (IsPredicate<Boolean>) 
IsPredicate.createIsPredicate(FunctionGenerationHelper.IS_NOT_FALSE, le);
     assertEquals(RowsMatch.ALL, isNotFalse.matches(re));
   }
 
diff --git 
a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/components/tables/BasicTablesTransformer.java
 
b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/components/tables/BasicTablesTransformer.java
index 863ba9d..d25fd2e 100644
--- 
a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/components/tables/BasicTablesTransformer.java
+++ 
b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/components/tables/BasicTablesTransformer.java
@@ -99,6 +99,9 @@ public class BasicTablesTransformer {
         case PARTITION:
           
partitions.add(PartitionMetadata.builder().metadataUnit(unit).build());
           break;
+        default:
+          // Ignore unsupported type
+          break;
       }
     }
     return new MetadataHolder(tables, segments, files, rowGroups, partitions);
diff --git 
a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/components/tables/TableMetadataUnit.java
 
b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/components/tables/TableMetadataUnit.java
index 7863a4d..1d6ed84 100644
--- 
a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/components/tables/TableMetadataUnit.java
+++ 
b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/components/tables/TableMetadataUnit.java
@@ -511,6 +511,8 @@ public class TableMetadataUnit {
               rowGroupColumns.add(name);
               partitionColumns.add(name);
               break;
+            default:
+              throw new IllegalStateException(scope.name());
           }
         }
 
diff --git 
a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/exceptions/MetastoreException.java
 
b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/exceptions/MetastoreException.java
index 96516df..f94d128 100644
--- 
a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/exceptions/MetastoreException.java
+++ 
b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/exceptions/MetastoreException.java
@@ -18,9 +18,11 @@
 package org.apache.drill.metastore.exceptions;
 
 /**
- * Drill Metastore runtime exception to indicate that exception was caused by 
Drill Metastore.
- * Drill Metastore implementations can use or extend it to throw Metastore 
specific exceptions.
+ * Drill Metastore runtime exception to indicate that exception was caused by
+ * Drill Metastore. Drill Metastore implementations can use or extend it to
+ * throw Metastore specific exceptions.
  */
+@SuppressWarnings("serial")
 public class MetastoreException extends RuntimeException {
 
   public MetastoreException(String message, Throwable cause) {
diff --git 
a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/metadata/BaseMetadata.java
 
b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/metadata/BaseMetadata.java
index ae6c547..c24b54f 100644
--- 
a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/metadata/BaseMetadata.java
+++ 
b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/metadata/BaseMetadata.java
@@ -42,8 +42,8 @@ public abstract class BaseMetadata implements Metadata {
   protected final TableInfo tableInfo;
   protected final MetadataInfo metadataInfo;
   protected final TupleMetadata schema;
-  protected final Map<SchemaPath, ColumnStatistics> columnsStatistics;
-  protected final Map<String, StatisticsHolder> metadataStatistics;
+  protected final Map<SchemaPath, ColumnStatistics<?>> columnsStatistics;
+  protected final Map<String, StatisticsHolder<?>> metadataStatistics;
   protected final long lastModifiedTime;
 
   protected <T extends BaseMetadataBuilder<T>> 
BaseMetadata(BaseMetadataBuilder<T> builder) {
@@ -60,12 +60,12 @@ public abstract class BaseMetadata implements Metadata {
   }
 
   @Override
-  public Map<SchemaPath, ColumnStatistics> getColumnsStatistics() {
+  public Map<SchemaPath, ColumnStatistics<?>> getColumnsStatistics() {
     return columnsStatistics;
   }
 
   @Override
-  public ColumnStatistics getColumnStatistics(SchemaPath columnName) {
+  public ColumnStatistics<?> getColumnStatistics(SchemaPath columnName) {
     return columnsStatistics.get(columnName);
   }
 
@@ -77,20 +77,20 @@ public abstract class BaseMetadata implements Metadata {
   @Override
   @SuppressWarnings("unchecked")
   public <V> V getStatistic(StatisticsKind<V> statisticsKind) {
-    StatisticsHolder<V> statisticsHolder = 
metadataStatistics.get(statisticsKind.getName());
+    StatisticsHolder<V> statisticsHolder = (StatisticsHolder<V>)
+        metadataStatistics.get(statisticsKind.getName());
     return statisticsHolder != null ? statisticsHolder.getStatisticsValue() : 
null;
   }
 
   @Override
-  public boolean containsExactStatistics(StatisticsKind statisticsKind) {
-    StatisticsHolder statisticsHolder = 
metadataStatistics.get(statisticsKind.getName());
+  public boolean containsExactStatistics(StatisticsKind<?> statisticsKind) {
+    StatisticsHolder<?> statisticsHolder = 
metadataStatistics.get(statisticsKind.getName());
     return statisticsHolder != null && 
statisticsHolder.getStatisticsKind().isExact();
   }
 
   @Override
-  @SuppressWarnings("unchecked")
   public <V> V getStatisticsForColumn(SchemaPath columnName, StatisticsKind<V> 
statisticsKind) {
-    return (V) columnsStatistics.get(columnName).get(statisticsKind);
+    return columnsStatistics.get(columnName).get(statisticsKind);
   }
 
   @Override
@@ -172,14 +172,14 @@ public abstract class BaseMetadata implements Metadata {
 
   protected abstract void toMetadataUnitBuilder(TableMetadataUnit.Builder 
builder);
 
-  protected abstract BaseMetadataBuilder toBuilder();
+  protected abstract BaseMetadataBuilder<?> toBuilder();
 
   public static abstract class BaseMetadataBuilder<T extends 
BaseMetadataBuilder<T>> {
     protected TableInfo tableInfo;
     protected MetadataInfo metadataInfo;
     protected TupleMetadata schema;
-    protected Map<SchemaPath, ColumnStatistics> columnsStatistics;
-    protected Collection<StatisticsHolder> metadataStatistics;
+    protected Map<SchemaPath, ColumnStatistics<?>> columnsStatistics;
+    protected Collection<StatisticsHolder<?>> metadataStatistics;
     protected long lastModifiedTime = UNDEFINED_TIME;
 
     public T tableInfo(TableInfo tableInfo) {
@@ -197,12 +197,12 @@ public abstract class BaseMetadata implements Metadata {
       return self();
     }
 
-    public T columnsStatistics(Map<SchemaPath, ColumnStatistics> 
columnsStatistics) {
+    public T columnsStatistics(Map<SchemaPath, ColumnStatistics<?>> 
columnsStatistics) {
       this.columnsStatistics = columnsStatistics;
       return self();
     }
 
-    public T metadataStatistics(Collection<StatisticsHolder> 
metadataStatistics) {
+    public T metadataStatistics(Collection<StatisticsHolder<?>> 
metadataStatistics) {
       this.metadataStatistics = metadataStatistics;
       return self();
     }
diff --git 
a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/metadata/BaseTableMetadata.java
 
b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/metadata/BaseTableMetadata.java
index 3b6922d..839cd51 100644
--- 
a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/metadata/BaseTableMetadata.java
+++ 
b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/metadata/BaseTableMetadata.java
@@ -102,9 +102,8 @@ public class BaseTableMetadata extends BaseMetadata 
implements TableMetadata {
   }
 
   @Override
-  @SuppressWarnings("unchecked")
-  public BaseTableMetadata cloneWithStats(Map<SchemaPath, ColumnStatistics> 
columnStatistics, List<StatisticsHolder> tableStatistics) {
-    Map<String, StatisticsHolder> mergedTableStatistics = new 
HashMap<>(this.metadataStatistics);
+  public BaseTableMetadata cloneWithStats(Map<SchemaPath, ColumnStatistics<?>> 
columnStatistics, List<StatisticsHolder<?>> tableStatistics) {
+    Map<String, StatisticsHolder<?>> mergedTableStatistics = new 
HashMap<>(this.metadataStatistics);
 
     // overrides statistics value for the case when new statistics is exact or 
existing one was estimated
     tableStatistics.stream()
@@ -113,12 +112,12 @@ public class BaseTableMetadata extends BaseMetadata 
implements TableMetadata {
               || 
!this.metadataStatistics.get(statisticsHolder.getStatisticsKind().getName()).getStatisticsKind().isExact())
         .forEach(statisticsHolder -> 
mergedTableStatistics.put(statisticsHolder.getStatisticsKind().getName(), 
statisticsHolder));
 
-    Map<SchemaPath, ColumnStatistics> newColumnsStatistics = new 
HashMap<>(this.columnsStatistics);
+    Map<SchemaPath, ColumnStatistics<?>> newColumnsStatistics = new 
HashMap<>(this.columnsStatistics);
     this.columnsStatistics.forEach(
         (columnName, value) -> {
-          ColumnStatistics sourceStatistics = columnStatistics.get(columnName);
+          ColumnStatistics<?> sourceStatistics = 
columnStatistics.get(columnName);
           if (sourceStatistics != null) {
-            newColumnsStatistics.put(columnName, 
value.cloneWith(sourceStatistics));
+            newColumnsStatistics.put(columnName, 
value.genericClone(sourceStatistics));
           }
         });
 
@@ -148,6 +147,7 @@ public class BaseTableMetadata extends BaseMetadata 
implements TableMetadata {
     }
   }
 
+  @Override
   public BaseTableMetadataBuilder toBuilder() {
     return builder()
         .tableInfo(tableInfo)
diff --git 
a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/metadata/Metadata.java
 
b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/metadata/Metadata.java
index 6b40d95..3d99ae7 100644
--- 
a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/metadata/Metadata.java
+++ 
b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/metadata/Metadata.java
@@ -37,7 +37,7 @@ public interface Metadata {
    *
    * @return statistics stored in current metadata
    */
-  Map<SchemaPath, ColumnStatistics> getColumnsStatistics();
+  Map<SchemaPath, ColumnStatistics<?>> getColumnsStatistics();
 
   /**
    * Returns statistics for specified column stored in current metadata.
@@ -45,7 +45,7 @@ public interface Metadata {
    * @param columnName column whose statistics should be returned
    * @return statistics for specified column
    */
-  ColumnStatistics getColumnStatistics(SchemaPath columnName);
+  ColumnStatistics<?> getColumnStatistics(SchemaPath columnName);
 
   /**
    * Returns schema stored in current metadata represented as
@@ -70,7 +70,7 @@ public interface Metadata {
    * @param statisticsKind statistics kind to check
    * @return true if value which corresponds to the specified statistics kind 
is exact
    */
-  boolean containsExactStatistics(StatisticsKind statisticsKind);
+  boolean containsExactStatistics(StatisticsKind<?> statisticsKind);
 
   /**
    * Returns value of column statistics which corresponds to specified {@link 
StatisticsKind}
diff --git 
a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/metadata/NonInterestingColumnsMetadata.java
 
b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/metadata/NonInterestingColumnsMetadata.java
index 6944ab0..26cf665 100644
--- 
a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/metadata/NonInterestingColumnsMetadata.java
+++ 
b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/metadata/NonInterestingColumnsMetadata.java
@@ -34,19 +34,19 @@ import java.util.Map;
  * to NonInterestingColumnsMetadata.
  */
 public class NonInterestingColumnsMetadata implements Metadata {
-  private final Map<SchemaPath, ColumnStatistics> columnsStatistics;
+  private final Map<SchemaPath, ColumnStatistics<?>> columnsStatistics;
 
-  public NonInterestingColumnsMetadata(Map<SchemaPath, ColumnStatistics> 
columnsStatistics) {
+  public NonInterestingColumnsMetadata(Map<SchemaPath, ColumnStatistics<?>> 
columnsStatistics) {
     this.columnsStatistics = columnsStatistics;
   }
 
   @Override
-  public Map<SchemaPath, ColumnStatistics> getColumnsStatistics() {
+  public Map<SchemaPath, ColumnStatistics<?>> getColumnsStatistics() {
     return columnsStatistics;
   }
 
   @Override
-  public ColumnStatistics getColumnStatistics(SchemaPath columnName) {
+  public ColumnStatistics<?> getColumnStatistics(SchemaPath columnName) {
     return columnsStatistics.get(columnName);
   }
 
@@ -61,14 +61,13 @@ public class NonInterestingColumnsMetadata implements 
Metadata {
   }
 
   @Override
-  public boolean containsExactStatistics(StatisticsKind statisticsKind) {
+  public boolean containsExactStatistics(StatisticsKind<?> statisticsKind) {
     return false;
   }
 
   @Override
-  @SuppressWarnings("unchecked")
   public <V> V getStatisticsForColumn(SchemaPath columnName, StatisticsKind<V> 
statisticsKind) {
-    return (V) columnsStatistics.get(columnName).get(statisticsKind);
+    return columnsStatistics.get(columnName).get(statisticsKind);
   }
 
   @Override
diff --git 
a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/metadata/TableMetadata.java
 
b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/metadata/TableMetadata.java
index 517d232..d4704fc 100644
--- 
a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/metadata/TableMetadata.java
+++ 
b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/metadata/TableMetadata.java
@@ -32,6 +32,7 @@ public interface TableMetadata extends Metadata {
 
   Path getLocation();
   long getLastModifiedTime();
-  TableMetadata cloneWithStats(Map<SchemaPath, ColumnStatistics> 
columnStatistics, List<StatisticsHolder> tableStatistics);
+  TableMetadata cloneWithStats(Map<SchemaPath, ColumnStatistics<?>> 
columnStatistics,
+      List<StatisticsHolder<?>> tableStatistics);
   List<SchemaPath> getInterestingColumns();
 }
diff --git 
a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/statistics/CollectableColumnStatisticsKind.java
 
b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/statistics/CollectableColumnStatisticsKind.java
index 00b1f1d..8c4a2a8 100644
--- 
a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/statistics/CollectableColumnStatisticsKind.java
+++ 
b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/statistics/CollectableColumnStatisticsKind.java
@@ -32,5 +32,5 @@ public interface CollectableColumnStatisticsKind<V> extends 
StatisticsKind<V> {
    * @param statistics list of {@link ColumnStatistics} instances to be 
collected
    * @return column statistics value received by collecting specified {@link 
ColumnStatistics}
    */
-  Object mergeStatistics(List<? extends ColumnStatistics> statistics);
+  Object mergeStatistics(List<? extends ColumnStatistics<?>> statistics);
 }
diff --git 
a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/statistics/ColumnStatistics.java
 
b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/statistics/ColumnStatistics.java
index 0cb33db..5fbb85a 100644
--- 
a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/statistics/ColumnStatistics.java
+++ 
b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/statistics/ColumnStatistics.java
@@ -74,13 +74,13 @@ public class ColumnStatistics<T> {
       .registerModule(new JodaModule())
       .readerFor(ColumnStatistics.class);
 
-  private final Map<String, StatisticsHolder> statistics;
+  private final Map<String, StatisticsHolder<?>> statistics;
   private final Comparator<T> valueComparator;
   private final TypeProtos.MinorType type;
 
   @JsonCreator
   @SuppressWarnings("unchecked")
-  public ColumnStatistics(@JsonProperty("statistics") 
Collection<StatisticsHolder> statistics,
+  public ColumnStatistics(@JsonProperty("statistics") 
Collection<StatisticsHolder<?>> statistics,
                           @JsonProperty("type") TypeProtos.MinorType type) {
     this.type = type;
     this.valueComparator = type != null
@@ -93,7 +93,7 @@ public class ColumnStatistics<T> {
             (a, b) -> a.getStatisticsKind().isExact() ? a : b));
   }
 
-  public ColumnStatistics(Collection<StatisticsHolder> statistics) {
+  public ColumnStatistics(Collection<StatisticsHolder<?>> statistics) {
     this(statistics, TypeProtos.MinorType.INT);
   }
 
@@ -105,7 +105,8 @@ public class ColumnStatistics<T> {
    */
   @SuppressWarnings("unchecked")
   public <V> V get(StatisticsKind<V> statisticsKind) {
-    StatisticsHolder<V> statisticsHolder = 
statistics.get(statisticsKind.getName());
+    StatisticsHolder<V> statisticsHolder = (StatisticsHolder<V>)
+        statistics.get(statisticsKind.getName());
     if (statisticsHolder != null) {
       return statisticsHolder.getStatisticsValue();
     }
@@ -118,7 +119,7 @@ public class ColumnStatistics<T> {
    * @param statisticsKind statistics kind to check
    * @return true if specified statistics kind is set
    */
-  public boolean contains(StatisticsKind statisticsKind) {
+  public boolean contains(StatisticsKind<?> statisticsKind) {
     return statistics.containsKey(statisticsKind.getName());
   }
 
@@ -129,8 +130,8 @@ public class ColumnStatistics<T> {
    * @param statisticsKind statistics kind to check
    * @return true if value which corresponds to the specified statistics kind 
is exact
    */
-  public boolean containsExact(StatisticsKind statisticsKind) {
-    StatisticsHolder statisticsHolder = 
statistics.get(statisticsKind.getName());
+  public boolean containsExact(StatisticsKind<?> statisticsKind) {
+    StatisticsHolder<?> statisticsHolder = 
statistics.get(statisticsKind.getName());
     if (statisticsHolder != null) {
       return statisticsHolder.getStatisticsKind().isExact();
     }
@@ -153,10 +154,10 @@ public class ColumnStatistics<T> {
    * @return new {@link ColumnStatistics} instance with overridden statistics
    */
   public ColumnStatistics<T> cloneWith(ColumnStatistics<T> sourceStatistics) {
-    Map<String, StatisticsHolder> newStats = new HashMap<>(this.statistics);
+    Map<String, StatisticsHolder<?>> newStats = new HashMap<>(this.statistics);
     sourceStatistics.statistics.values().forEach(statisticsHolder -> {
-      StatisticsKind statisticsKindToMerge = 
statisticsHolder.getStatisticsKind();
-      StatisticsHolder oldStatistics = 
statistics.get(statisticsKindToMerge.getName());
+      StatisticsKind<?> statisticsKindToMerge = 
statisticsHolder.getStatisticsKind();
+      StatisticsHolder<?> oldStatistics = 
statistics.get(statisticsKindToMerge.getName());
       if (oldStatistics == null
           || !oldStatistics.getStatisticsKind().isExact()
           || statisticsKindToMerge.isExact()) {
@@ -167,9 +168,14 @@ public class ColumnStatistics<T> {
     return new ColumnStatistics<>(newStats.values(), type);
   }
 
+  @SuppressWarnings("unchecked")
+  public ColumnStatistics<T> genericClone(ColumnStatistics<?> 
sourceStatistics) {
+    return cloneWith((ColumnStatistics<T>) sourceStatistics);
+  }
+
   @JsonProperty("statistics")
   @SuppressWarnings("unused") // used for serialization
-  private Collection<StatisticsHolder> getAll() {
+  private Collection<StatisticsHolder<?>> getAll() {
     return statistics.values();
   }
 
@@ -212,7 +218,7 @@ public class ColumnStatistics<T> {
         .toString();
   }
 
-  public static ColumnStatistics of(String columnStatistics) {
+  public static ColumnStatistics<?> of(String columnStatistics) {
     try {
       return OBJECT_READER.readValue(columnStatistics);
     } catch (IOException e) {
diff --git 
a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/statistics/ColumnStatisticsKind.java
 
b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/statistics/ColumnStatisticsKind.java
index 613b602..5cfed03 100644
--- 
a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/statistics/ColumnStatisticsKind.java
+++ 
b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/statistics/ColumnStatisticsKind.java
@@ -19,6 +19,7 @@ package org.apache.drill.metastore.statistics;
 
 import org.apache.drill.metastore.metadata.BaseMetadata;
 
+import java.util.Comparator;
 import java.util.List;
 
 /**
@@ -33,7 +34,7 @@ public class ColumnStatisticsKind<T> extends 
BaseStatisticsKind<T> implements Co
   public static final ColumnStatisticsKind<Long> NULLS_COUNT =
       new ColumnStatisticsKind<Long>(ExactStatisticsConstants.NULLS_COUNT, 
true) {
         @Override
-        public Long mergeStatistics(List<? extends ColumnStatistics> 
statisticsList) {
+        public Long mergeStatistics(List<? extends ColumnStatistics<?>> 
statisticsList) {
           long nullsCount = 0;
           for (ColumnStatistics<?> statistics : statisticsList) {
             Long statNullsCount = statistics.get(this);
@@ -47,7 +48,7 @@ public class ColumnStatisticsKind<T> extends 
BaseStatisticsKind<T> implements Co
         }
 
         @Override
-        public Long getFrom(ColumnStatistics metadata) {
+        public Long getFrom(ColumnStatistics<?> metadata) {
           Long rowCount = super.getFrom(metadata);
           return rowCount != null ? rowCount : Statistic.NO_COLUMN_STATS;
         }
@@ -60,11 +61,12 @@ public class ColumnStatisticsKind<T> extends 
BaseStatisticsKind<T> implements Co
       new ColumnStatisticsKind<Object>(ExactStatisticsConstants.MIN_VALUE, 
true) {
         @Override
         @SuppressWarnings("unchecked")
-        public Object mergeStatistics(List<? extends ColumnStatistics> 
statisticsList) {
+        public Object mergeStatistics(List<? extends ColumnStatistics<?>> 
statisticsList) {
           Object minValue = null;
-          for (ColumnStatistics statistics : statisticsList) {
+          for (ColumnStatistics<?> statistics : statisticsList) {
             Object statMinValue = getValueStatistic(statistics);
-            if (statMinValue != null && 
(statistics.getValueComparator().compare(minValue, statMinValue) > 0 || 
minValue == null)) {
+            Comparator<Object> comp = (Comparator<Object>) 
statistics.getValueComparator();
+            if (statMinValue != null && (comp.compare(minValue, statMinValue) 
> 0 || minValue == null)) {
               minValue = statMinValue;
             }
           }
@@ -79,11 +81,12 @@ public class ColumnStatisticsKind<T> extends 
BaseStatisticsKind<T> implements Co
       new ColumnStatisticsKind<Object>(ExactStatisticsConstants.MAX_VALUE, 
true) {
         @Override
         @SuppressWarnings("unchecked")
-        public Object mergeStatistics(List<? extends ColumnStatistics> 
statisticsList) {
+        public Object mergeStatistics(List<? extends ColumnStatistics<?>> 
statisticsList) {
           Object maxValue = null;
-          for (ColumnStatistics statistics : statisticsList) {
+          for (ColumnStatistics<?> statistics : statisticsList) {
             Object statMaxValue = getValueStatistic(statistics);
-            if (statMaxValue != null && 
statistics.getValueComparator().compare(maxValue, statMaxValue) < 0) {
+            Comparator<Object> comp = (Comparator<Object>) 
statistics.getValueComparator();
+            if (statMaxValue != null && comp.compare(maxValue, statMaxValue) < 
0) {
               maxValue = statMaxValue;
             }
           }
@@ -97,7 +100,7 @@ public class ColumnStatisticsKind<T> extends 
BaseStatisticsKind<T> implements Co
   public static final ColumnStatisticsKind<Long> NON_NULL_VALUES_COUNT =
       new 
ColumnStatisticsKind<Long>(ExactStatisticsConstants.NON_NULL_VALUES_COUNT, 
true) {
         @Override
-        public Long mergeStatistics(List<? extends ColumnStatistics> 
statisticsList) {
+        public Long mergeStatistics(List<? extends ColumnStatistics<?>> 
statisticsList) {
           long nonNullRowCount = 0;
           for (ColumnStatistics<?> statistics : statisticsList) {
             Long nnRowCount = statistics.get(this);
@@ -115,7 +118,7 @@ public class ColumnStatisticsKind<T> extends 
BaseStatisticsKind<T> implements Co
   public static final ColumnStatisticsKind<Double> NON_NULL_COUNT =
       new ColumnStatisticsKind<Double>(Statistic.NNROWCOUNT, false) {
         @Override
-        public Double mergeStatistics(List<? extends ColumnStatistics> 
statisticsList) {
+        public Double mergeStatistics(List<? extends ColumnStatistics<?>> 
statisticsList) {
           double nonNullRowCount = 0;
           for (ColumnStatistics<?> statistics : statisticsList) {
             Double nnRowCount = statistics.get(this);
@@ -133,7 +136,7 @@ public class ColumnStatisticsKind<T> extends 
BaseStatisticsKind<T> implements Co
   public static final ColumnStatisticsKind<Double> ROWCOUNT =
       new ColumnStatisticsKind<Double>(Statistic.ROWCOUNT, false) {
         @Override
-        public Double mergeStatistics(List<? extends ColumnStatistics> 
statisticsList) {
+        public Double mergeStatistics(List<? extends ColumnStatistics<?>> 
statisticsList) {
           double rowCount = 0;
           for (ColumnStatistics<?> statistics : statisticsList) {
             Double count = getFrom(statistics);
@@ -154,8 +157,8 @@ public class ColumnStatisticsKind<T> extends 
BaseStatisticsKind<T> implements Co
   /**
    * Column statistics kind which is the width of the specific column.
    */
-  public static final ColumnStatisticsKind AVG_WIDTH =
-      new ColumnStatisticsKind(Statistic.AVG_WIDTH, false);
+  public static final ColumnStatisticsKind<?> AVG_WIDTH =
+      new ColumnStatisticsKind<>(Statistic.AVG_WIDTH, false);
 
   /**
    * Column statistics kind which is the histogram of the specific column.
@@ -184,7 +187,7 @@ public class ColumnStatisticsKind<T> extends 
BaseStatisticsKind<T> implements Co
   }
 
   @Override
-  public T mergeStatistics(List<? extends ColumnStatistics> statistics) {
+  public T mergeStatistics(List<? extends ColumnStatistics<?>> statistics) {
     throw new UnsupportedOperationException("Cannot merge statistics for " + 
statisticKey);
   }
 }
diff --git 
a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/statistics/StatisticsHolder.java
 
b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/statistics/StatisticsHolder.java
index 94e8b10..ffbe3ac 100644
--- 
a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/statistics/StatisticsHolder.java
+++ 
b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/statistics/StatisticsHolder.java
@@ -42,19 +42,19 @@ public class StatisticsHolder<T> {
   private static final ObjectReader OBJECT_READER = new 
ObjectMapper().readerFor(StatisticsHolder.class);
 
   private final T statisticsValue;
-  private final BaseStatisticsKind statisticsKind;
+  private final BaseStatisticsKind<?> statisticsKind;
 
   @JsonCreator
   public StatisticsHolder(@JsonProperty("statisticsValue") T statisticsValue,
-                          @JsonProperty("statisticsKind") BaseStatisticsKind 
statisticsKind) {
+                          @JsonProperty("statisticsKind") 
BaseStatisticsKind<?> statisticsKind) {
     this.statisticsValue = statisticsValue;
     this.statisticsKind = statisticsKind;
   }
 
   public StatisticsHolder(T statisticsValue,
-                          StatisticsKind statisticsKind) {
+                          StatisticsKind<?> statisticsKind) {
     this.statisticsValue = statisticsValue;
-    this.statisticsKind = (BaseStatisticsKind) statisticsKind;
+    this.statisticsKind = (BaseStatisticsKind<?>) statisticsKind;
   }
 
   @JsonTypeInfo(use = JsonTypeInfo.Id.CLASS,
@@ -63,7 +63,7 @@ public class StatisticsHolder<T> {
     return statisticsValue;
   }
 
-  public StatisticsKind getStatisticsKind() {
+  public StatisticsKind<?> getStatisticsKind() {
     return statisticsKind;
   }
 
@@ -101,7 +101,7 @@ public class StatisticsHolder<T> {
         .toString();
   }
 
-  public static StatisticsHolder of(String serialized) {
+  public static StatisticsHolder<?> of(String serialized) {
     try {
       return OBJECT_READER.readValue(serialized);
     } catch (IOException e) {
diff --git 
a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/util/TableMetadataUtils.java
 
b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/util/TableMetadataUtils.java
index 1aac570..2443663 100644
--- 
a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/util/TableMetadataUtils.java
+++ 
b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/util/TableMetadataUtils.java
@@ -46,21 +46,32 @@ public class TableMetadataUtils {
    * @param type type of the column
    * @return {@link Comparator} instance
    */
-  public static Comparator getComparator(TypeProtos.MinorType type) {
+  @SuppressWarnings("unchecked")
+  public static <T> Comparator<T> getComparator(TypeProtos.MinorType type) {
     switch (type) {
       case INTERVALDAY:
       case INTERVAL:
       case INTERVALYEAR:
-        return 
Comparator.nullsFirst(UnsignedBytes.lexicographicalComparator());
+        // This odd cast is needed because this method is poorly designed.
+        // The method is statically typed to type T. But, the type
+        // is selected dynamically at runtime via the type parameter.
+        // As a result, we are casting a comparator to the WRONG type
+        // in some cases. We have to remove the byte[] type, then force
+        // the type to T. This works because we should only use this
+        // case if T is byte[]. But, this is a horrible hack and should
+        // be fixed.
+        return (Comparator<T>) (Comparator<?>)
+            Comparator.nullsFirst(UnsignedBytes.lexicographicalComparator());
       case UINT1:
-        return Comparator.nullsFirst(UnsignedBytes::compare);
+        return (Comparator<T>)
+            Comparator.nullsFirst(UnsignedBytes::compare);
       case UINT2:
       case UINT4:
-        return Comparator.nullsFirst(Integer::compareUnsigned);
+        return (Comparator<T>) Comparator.nullsFirst(Integer::compareUnsigned);
       case UINT8:
-        return Comparator.nullsFirst(Long::compareUnsigned);
+        return (Comparator<T>) Comparator.nullsFirst(Long::compareUnsigned);
       default:
-        return getNaturalNullsFirstComparator();
+        return (Comparator<T>) getNaturalNullsFirstComparator();
     }
   }
 
@@ -83,14 +94,15 @@ public class TableMetadataUtils {
    * @param statisticsToCollect kinds of statistics that should be collected
    * @return list of merged metadata
    */
-  public static <T extends BaseMetadata> Map<SchemaPath, ColumnStatistics> 
mergeColumnsStatistics(
-      Collection<T> metadataList, Set<SchemaPath> columns, 
List<CollectableColumnStatisticsKind> statisticsToCollect) {
-    Map<SchemaPath, ColumnStatistics> columnsStatistics = new HashMap<>();
+  @SuppressWarnings({ "unchecked", "rawtypes" })
+  public static <T extends BaseMetadata> Map<SchemaPath, ColumnStatistics<?>> 
mergeColumnsStatistics(
+      Collection<T> metadataList, Set<SchemaPath> columns, 
List<CollectableColumnStatisticsKind<?>> statisticsToCollect) {
+    Map<SchemaPath, ColumnStatistics<?>> columnsStatistics = new HashMap<>();
 
     for (SchemaPath column : columns) {
-      List<ColumnStatistics> statisticsList = new ArrayList<>();
+      List<ColumnStatistics<?>> statisticsList = new ArrayList<>();
       for (T metadata : metadataList) {
-        ColumnStatistics statistics = 
metadata.getColumnsStatistics().get(column);
+        ColumnStatistics<?> statistics = 
metadata.getColumnsStatistics().get(column);
         if (statistics == null) {
           // schema change happened, set statistics which represents all nulls
           statistics = new ColumnStatistics(
@@ -99,12 +111,12 @@ public class TableMetadataUtils {
         }
         statisticsList.add(statistics);
       }
-      List<StatisticsHolder> statisticsHolders = new ArrayList<>();
-      for (CollectableColumnStatisticsKind statisticsKind : 
statisticsToCollect) {
+      List<StatisticsHolder<?>> statisticsHolders = new ArrayList<>();
+      for (CollectableColumnStatisticsKind<?> statisticsKind : 
statisticsToCollect) {
         Object mergedStatistic = 
statisticsKind.mergeStatistics(statisticsList);
         statisticsHolders.add(new StatisticsHolder<>(mergedStatistic, 
statisticsKind));
       }
-      Iterator<ColumnStatistics> iterator = statisticsList.iterator();
+      Iterator<ColumnStatistics<?>> iterator = statisticsList.iterator();
       // Use INT if statistics wasn't provided
       TypeProtos.MinorType comparatorType = iterator.hasNext() ? 
iterator.next().getComparatorType() : TypeProtos.MinorType.INT;
       columnsStatistics.put(column, new ColumnStatistics<>(statisticsHolders, 
comparatorType));
@@ -120,13 +132,13 @@ public class TableMetadataUtils {
    * @return new {@link TableMetadata} instance with updated statistics
    */
   public static TableMetadata updateRowCount(TableMetadata tableMetadata, 
Collection<? extends BaseMetadata> statistics) {
-    List<StatisticsHolder> newStats = new ArrayList<>();
+    List<StatisticsHolder<?>> newStats = new ArrayList<>();
 
     newStats.add(new 
StatisticsHolder<>(TableStatisticsKind.ROW_COUNT.mergeStatistics(statistics), 
TableStatisticsKind.ROW_COUNT));
 
     Set<SchemaPath> columns = tableMetadata.getColumnsStatistics().keySet();
 
-    Map<SchemaPath, ColumnStatistics> columnsStatistics =
+    Map<SchemaPath, ColumnStatistics<?>> columnsStatistics =
         mergeColumnsStatistics(statistics, columns,
             Collections.singletonList(ColumnStatisticsKind.NULLS_COUNT));
 
diff --git 
a/metastore/metastore-api/src/test/java/org/apache/drill/metastore/components/tables/TestTableMetadataUnitConversion.java
 
b/metastore/metastore-api/src/test/java/org/apache/drill/metastore/components/tables/TestTableMetadataUnitConversion.java
index c31c254..7a6212c 100644
--- 
a/metastore/metastore-api/src/test/java/org/apache/drill/metastore/components/tables/TestTableMetadataUnitConversion.java
+++ 
b/metastore/metastore-api/src/test/java/org/apache/drill/metastore/components/tables/TestTableMetadataUnitConversion.java
@@ -387,9 +387,9 @@ public class TestTableMetadataUnitConversion extends 
BaseTest {
 
     private final TableInfo fullTableInfo;
     private final TableInfo basicTableInfo;
-    private final Map<SchemaPath, ColumnStatistics> columnsStatistics;
+    private final Map<SchemaPath, ColumnStatistics<?>> columnsStatistics;
     private final Map<String, String> unitColumnsStatistics;
-    private final Collection<StatisticsHolder> metadataStatistics;
+    private final Collection<StatisticsHolder<?>> metadataStatistics;
     private final List<String> unitMetadataStatistics;
     private final TupleMetadata schema;
     private final String unitSchema;
diff --git 
a/metastore/metastore-api/src/test/java/org/apache/drill/metastore/metadata/MetadataSerDeTest.java
 
b/metastore/metastore-api/src/test/java/org/apache/drill/metastore/metadata/MetadataSerDeTest.java
index 520e59e..bc6fad2 100644
--- 
a/metastore/metastore-api/src/test/java/org/apache/drill/metastore/metadata/MetadataSerDeTest.java
+++ 
b/metastore/metastore-api/src/test/java/org/apache/drill/metastore/metadata/MetadataSerDeTest.java
@@ -70,7 +70,7 @@ public class MetadataSerDeTest extends BaseTest {
 
   @Test
   public void testColumnStatisticsSerialization() {
-    List<StatisticsHolder> statistics = Arrays.asList(
+    List<StatisticsHolder<?>> statistics = Arrays.asList(
         new StatisticsHolder<>("aaa", ColumnStatisticsKind.MIN_VALUE),
         new StatisticsHolder<>("zzz", ColumnStatisticsKind.MAX_VALUE),
         new StatisticsHolder<>(3, ColumnStatisticsKind.NULLS_COUNT),
@@ -95,7 +95,7 @@ public class MetadataSerDeTest extends BaseTest {
 
   @Test
   public void testColumnStatisticsDeserialization() {
-    List<StatisticsHolder> statistics = Arrays.asList(
+    List<StatisticsHolder<?>> statistics = Arrays.asList(
         new StatisticsHolder<>("aaa", ColumnStatisticsKind.MIN_VALUE),
         new StatisticsHolder<>("zzz", ColumnStatisticsKind.MAX_VALUE),
         new StatisticsHolder<>(3, ColumnStatisticsKind.NULLS_COUNT),
@@ -103,13 +103,13 @@ public class MetadataSerDeTest extends BaseTest {
     ColumnStatistics<String> columnStatistics = new 
ColumnStatistics<>(statistics, TypeProtos.MinorType.VARCHAR);
     String serializedColumnStatistics = columnStatistics.jsonString();
 
-    ColumnStatistics deserialized = 
ColumnStatistics.of(serializedColumnStatistics);
+    ColumnStatistics<?> deserialized = 
ColumnStatistics.of(serializedColumnStatistics);
 
     assertEquals("Type was incorrectly deserialized",
         columnStatistics.getComparatorType(),
         deserialized.getComparatorType());
 
-    for (StatisticsHolder statistic : statistics) {
+    for (StatisticsHolder<?> statistic : statistics) {
       assertEquals("Statistics kind was incorrectly deserialized",
           statistic.getStatisticsKind().isExact(),
           deserialized.containsExact(statistic.getStatisticsKind()));
@@ -120,7 +120,7 @@ public class MetadataSerDeTest extends BaseTest {
   }
 
   private <T> void checkStatisticsHolderSerialization(T statisticsValue,
-      BaseStatisticsKind statisticsKind, String expectedString) {
+      BaseStatisticsKind<?> statisticsKind, String expectedString) {
     StatisticsHolder<T> statisticsHolder =
         new StatisticsHolder<>(statisticsValue, statisticsKind);
     String serializedStatisticsHolder = statisticsHolder.jsonString();
@@ -131,10 +131,10 @@ public class MetadataSerDeTest extends BaseTest {
   }
 
   private <T> void checkStatisticsHolderDeserialization(T statisticsValue,
-      BaseStatisticsKind statisticsKind) {
+      BaseStatisticsKind<?> statisticsKind) {
     StatisticsHolder<T> rowCount =
         new StatisticsHolder<>(statisticsValue, statisticsKind);
-    StatisticsHolder deserializedRowCount = 
StatisticsHolder.of(rowCount.jsonString());
+    StatisticsHolder<?> deserializedRowCount = 
StatisticsHolder.of(rowCount.jsonString());
 
     assertTrue("Statistics value was incorrectly deserialized",
         Objects.deepEquals(rowCount.getStatisticsValue(), 
deserializedRowCount.getStatisticsValue()));
@@ -142,7 +142,7 @@ public class MetadataSerDeTest extends BaseTest {
     assertStatisticsKindsEquals(rowCount, deserializedRowCount);
   }
 
-  private <T> void assertStatisticsKindsEquals(StatisticsHolder<T> expected, 
StatisticsHolder actual) {
+  private <T> void assertStatisticsKindsEquals(StatisticsHolder<T> expected, 
StatisticsHolder<?> actual) {
     assertEquals("isExact statistics kind was incorrectly deserialized",
         expected.getStatisticsKind().isExact(),
         actual.getStatisticsKind().isExact());

Reply via email to